/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.util.Collections;
import java.util.List;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsEventsHoodieIncrSource
extends HoodieIncrSource {
    private final String srcPath;
    private final boolean checkIfFileExists;
    private final int numInstantsPerFetch;
    private final IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
    private final GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
    private final CloudDataFetcher gcsObjectDataFetcher;
    private final QueryRunner queryRunner;
    private final Option<SchemaProvider> schemaProvider;
    private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
    public static final String GCS_OBJECT_KEY = "name";
    public static final String GCS_OBJECT_SIZE = "size";
    private static final Logger LOG = LoggerFactory.getLogger(GcsEventsHoodieIncrSource.class);

    public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, SchemaProvider schemaProvider) {
        this(props, jsc, spark, schemaProvider, new GcsObjectMetadataFetcher(props, GcsEventsHoodieIncrSource.getSourceFileFormat(props)), new CloudDataFetcher(props, ConfigUtils.getStringWithAltKeys(props, CloudSourceConfig.DATAFILE_FORMAT, true)), new QueryRunner(spark, props));
    }

    GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, SchemaProvider schemaProvider, GcsObjectMetadataFetcher gcsObjectMetadataFetcher, CloudDataFetcher gcsObjectDataFetcher, QueryRunner queryRunner) {
        super(props, jsc, spark, schemaProvider);
        ConfigUtils.checkRequiredConfigProperties(props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
        this.srcPath = ConfigUtils.getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
        this.missingCheckpointStrategy = IncrSourceHelper.getMissingCheckpointStrategy(props);
        this.numInstantsPerFetch = ConfigUtils.getIntWithAltKeys(props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH);
        this.checkIfFileExists = ConfigUtils.getBooleanWithAltKeys(props, CloudSourceConfig.ENABLE_EXISTS_CHECK);
        this.gcsObjectMetadataFetcher = gcsObjectMetadataFetcher;
        this.gcsObjectDataFetcher = gcsObjectDataFetcher;
        this.queryRunner = queryRunner;
        this.schemaProvider = Option.ofNullable(schemaProvider);
        this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props);
        LOG.info("srcPath: " + this.srcPath);
        LOG.info("missingCheckpointStrategy: " + (Object)((Object)this.missingCheckpointStrategy));
        LOG.info("numInstantsPerFetch: " + this.numInstantsPerFetch);
        LOG.info("checkIfFileExists: " + this.checkIfFileExists);
    }

    @Override
    public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCheckpoint, long sourceLimit) {
        CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
        TimelineUtils.HollowCommitHandling handlingMode = IncrSourceHelper.getHollowCommitHandleMode(this.props);
        QueryInfo queryInfo = IncrSourceHelper.generateQueryInfo(this.sparkContext, this.srcPath, this.numInstantsPerFetch, Option.of(cloudObjectIncrCheckpoint.getCommit()), this.missingCheckpointStrategy, handlingMode, HoodieRecord.COMMIT_TIME_METADATA_FIELD, GCS_OBJECT_KEY, GCS_OBJECT_SIZE, true, Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
        LOG.info("Querying GCS with:" + cloudObjectIncrCheckpoint + " and queryInfo:" + queryInfo);
        if (StringUtils.isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryInfo.areStartAndEndInstantsEqual()) {
            LOG.info("Source of file names is empty. Returning empty result and endInstant: " + queryInfo.getStartInstant());
            return Pair.of(Option.empty(), queryInfo.getStartInstant());
        }
        Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = this.queryRunner.run(queryInfo, this.snapshotLoadQuerySplitter);
        Dataset<Row> filteredSourceData = this.gcsObjectMetadataFetcher.applyFilter(queryInfoDatasetPair.getRight());
        queryInfo = queryInfoDatasetPair.getLeft();
        LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit);
        Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint);
        if (!checkPointAndDataset.getRight().isPresent()) {
            LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant());
            return Pair.of(Option.empty(), queryInfo.getEndInstant());
        }
        LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
        Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset = this.extractData(queryInfo, checkPointAndDataset.getRight().get());
        return Pair.of(extractedCheckPointAndDataset.getLeft(), checkPointAndDataset.getLeft().toString());
    }

    private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, Dataset<Row> cloudObjectMetadataDF) {
        List<CloudObjectMetadata> cloudObjectMetadata = this.gcsObjectMetadataFetcher.getGcsObjectMetadata(this.sparkContext, cloudObjectMetadataDF, this.checkIfFileExists);
        LOG.info("Total number of files to process :" + cloudObjectMetadata.size());
        Option<Dataset<Row>> fileDataRows = this.gcsObjectDataFetcher.getCloudObjectDataDF(this.sparkSession, cloudObjectMetadata, this.props, this.schemaProvider);
        return Pair.of(fileDataRows, queryInfo.getEndInstant());
    }

    private static String getSourceFileFormat(TypedProperties props) {
        return ConfigUtils.getStringWithAltKeys(props, HoodieIncrSourceConfig.SOURCE_FILE_FORMAT, true);
    }
}

