/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.config.HiveIncrPullSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveIncrPullSource
extends AvroSource {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HiveIncrPullSource.class);
    private final transient FileSystem fs;
    private final String incrPullRootPath;

    public HiveIncrPullSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
        ConfigUtils.checkRequiredConfigProperties(props, Collections.singletonList(HiveIncrPullSourceConfig.ROOT_INPUT_PATH));
        this.incrPullRootPath = ConfigUtils.getStringWithAltKeys(props, HiveIncrPullSourceConfig.ROOT_INPUT_PATH);
        this.fs = FSUtils.getFs(this.incrPullRootPath, sparkContext.hadoopConfiguration());
    }

    private Option<String> findCommitToPull(Option<String> latestTargetCommit) throws IOException {
        LOG.info("Looking for commits ");
        FileStatus[] commitTimePaths = this.fs.listStatus(new Path(this.incrPullRootPath));
        ArrayList<String> commitTimes = new ArrayList<String>(commitTimePaths.length);
        for (FileStatus commitTimePath : commitTimePaths) {
            String[] splits = commitTimePath.getPath().toString().split("/");
            commitTimes.add(splits[splits.length - 1]);
        }
        Collections.sort(commitTimes);
        LOG.info("Retrieved commit times " + commitTimes);
        if (!latestTargetCommit.isPresent()) {
            return Option.of(commitTimes.get(0));
        }
        for (String instantTime : commitTimes) {
            if (instantTime.compareTo(latestTargetCommit.get()) <= 0) continue;
            return Option.of(instantTime);
        }
        return Option.empty();
    }

    @Override
    protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
        try {
            Option<String> commitToPull = this.findCommitToPull(lastCheckpointStr);
            if (!commitToPull.isPresent()) {
                return new InputBatch<JavaRDD<GenericRecord>>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
            }
            List<FileStatus> commitDeltaFiles = Arrays.asList(this.fs.listStatus(new Path(this.incrPullRootPath, commitToPull.get())));
            String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
            JavaPairRDD avroRDD = this.sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, this.sparkContext.hadoopConfiguration());
            this.sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch new data");
            return new InputBatch<JavaRDD<GenericRecord>>(Option.of(avroRDD.keys().map((Function & Serializable)r -> (GenericRecord)r.datum())), String.valueOf(commitToPull.get()));
        }
        catch (Exception e) {
            throw new HoodieReadFromSourceException("Unable to read from source from checkpoint: " + lastCheckpointStr, e);
        }
    }

    static class Config {
        @Deprecated
        private static final String ROOT_INPUT_PATH_PROP = HiveIncrPullSourceConfig.ROOT_INPUT_PATH.key();

        Config() {
        }
    }
}

