/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class GcsEventsHoodieIncrSource
extends HoodieIncrSource {
    private final String srcPath;
    private final boolean checkIfFileExists;
    private final int numInstantsPerFetch;
    private final IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
    private final FilePathsFetcher filePathsFetcher;
    private final FileDataFetcher fileDataFetcher;
    private static final Logger LOG = LogManager.getLogger(GcsEventsHoodieIncrSource.class);

    public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, SchemaProvider schemaProvider) {
        this(props, jsc, spark, schemaProvider, new FilePathsFetcher(props, GcsEventsHoodieIncrSource.getSourceFileFormat(props)), new FileDataFetcher(props, props.getString("hoodie.deltastreamer.source.cloud.data.datafile.format", "parquet")));
    }

    GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, SchemaProvider schemaProvider, FilePathsFetcher filePathsFetcher, FileDataFetcher fileDataFetcher) {
        super(props, jsc, spark, schemaProvider);
        DataSourceUtils.checkRequiredProperties(props, Collections.singletonList("hoodie.deltastreamer.source.hoodieincr.path"));
        this.srcPath = props.getString("hoodie.deltastreamer.source.hoodieincr.path");
        this.missingCheckpointStrategy = IncrSourceHelper.getMissingCheckpointStrategy(props);
        this.numInstantsPerFetch = props.getInteger("hoodie.deltastreamer.source.hoodieincr.num_instants", HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH);
        this.checkIfFileExists = props.getBoolean("hoodie.deltastreamer.source.cloud.data.check.file.exists", CloudStoreIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK);
        this.filePathsFetcher = filePathsFetcher;
        this.fileDataFetcher = fileDataFetcher;
        LOG.info((Object)("srcPath: " + this.srcPath));
        LOG.info((Object)("missingCheckpointStrategy: " + (Object)((Object)this.missingCheckpointStrategy)));
        LOG.info((Object)("numInstantsPerFetch: " + this.numInstantsPerFetch));
        LOG.info((Object)("checkIfFileExists: " + this.checkIfFileExists));
    }

    @Override
    public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
        QueryInfo queryInfo = this.getQueryInfo(lastCkptStr);
        if (queryInfo.areStartAndEndInstantsEqual()) {
            LOG.info((Object)("Already caught up. Begin Checkpoint was: " + queryInfo.getStartInstant()));
            return Pair.of(Option.empty(), queryInfo.getStartInstant());
        }
        Dataset<Row> sourceForFilenames = queryInfo.initializeSourceForFilenames(this.srcPath, this.sparkSession);
        if (sourceForFilenames.isEmpty()) {
            LOG.info((Object)("Source of file names is empty. Returning empty result and endInstant: " + queryInfo.getEndInstant()));
            return Pair.of(Option.empty(), queryInfo.getEndInstant());
        }
        return this.extractData(queryInfo, sourceForFilenames);
    }

    private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, Dataset<Row> sourceForFilenames) {
        List<String> filepaths = this.filePathsFetcher.getGcsFilePaths(this.sparkContext, sourceForFilenames, this.checkIfFileExists);
        LOG.debug((Object)("Extracted " + filepaths.size() + " distinct files. Some samples " + filepaths.stream().limit(10L).collect(Collectors.toList())));
        Option<Dataset<Row>> fileDataRows = this.fileDataFetcher.fetchFileData(this.sparkSession, filepaths, this.props);
        return Pair.of(fileDataRows, queryInfo.getEndInstant());
    }

    private QueryInfo getQueryInfo(Option<String> lastCkptStr) {
        Option<String> beginInstant = this.getBeginInstant(lastCkptStr);
        Pair<String, Pair<String, String>> queryInfoPair = IncrSourceHelper.calculateBeginAndEndInstants(this.sparkContext, this.srcPath, this.numInstantsPerFetch, beginInstant, this.missingCheckpointStrategy);
        QueryInfo queryInfo = new QueryInfo(queryInfoPair.getLeft(), queryInfoPair.getRight().getLeft(), queryInfoPair.getRight().getRight());
        if (LOG.isDebugEnabled()) {
            queryInfo.logDetails();
        }
        return queryInfo;
    }

    private Option<String> getBeginInstant(Option<String> lastCheckpoint) {
        if (lastCheckpoint.isPresent() && !StringUtils.isNullOrEmpty(lastCheckpoint.get())) {
            return lastCheckpoint;
        }
        return Option.empty();
    }

    private static String getSourceFileFormat(TypedProperties props) {
        return props.getString("hoodie.deltastreamer.source.hoodieincr.file.format", "parquet");
    }
}

