/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.util.Collections;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3EventsHoodieIncrSource
extends HoodieIncrSource {
    private static final Logger LOG = LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
    private final String srcPath;
    private final int numInstantsPerFetch;
    private final IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
    private final QueryRunner queryRunner;
    private final CloudDataFetcher cloudDataFetcher;
    private final Option<SchemaProvider> schemaProvider;
    private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;

    public S3EventsHoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
        this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, props), new CloudDataFetcher(props, sparkContext, sparkSession, metrics), new DefaultStreamContext(schemaProvider, Option.empty()));
    }

    public S3EventsHoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) {
        this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, props), new CloudDataFetcher(props, sparkContext, sparkSession, metrics), streamContext);
    }

    S3EventsHoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, QueryRunner queryRunner, CloudDataFetcher cloudDataFetcher, StreamContext streamContext) {
        super(props, sparkContext, sparkSession, streamContext);
        ConfigUtils.checkRequiredConfigProperties(props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
        this.srcPath = ConfigUtils.getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
        this.numInstantsPerFetch = ConfigUtils.getIntWithAltKeys(props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH);
        this.missingCheckpointStrategy = IncrSourceHelper.getMissingCheckpointStrategy(props);
        this.queryRunner = queryRunner;
        this.cloudDataFetcher = cloudDataFetcher;
        this.schemaProvider = Option.ofNullable(streamContext.getSchemaProvider());
        this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props);
    }

    @Override
    public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCheckpoint, long sourceLimit) {
        CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
        TimelineUtils.HollowCommitHandling handlingMode = IncrSourceHelper.getHollowCommitHandleMode(this.props);
        QueryInfo queryInfo = IncrSourceHelper.generateQueryInfo(this.sparkContext, this.srcPath, this.numInstantsPerFetch, Option.of(cloudObjectIncrCheckpoint.getCommit()), this.missingCheckpointStrategy, handlingMode, HoodieRecord.COMMIT_TIME_METADATA_FIELD, "s3.object.key", "s3.object.size", true, Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
        LOG.info("Querying S3 with:" + cloudObjectIncrCheckpoint + ", queryInfo:" + queryInfo);
        if (StringUtils.isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryInfo.areStartAndEndInstantsEqual()) {
            LOG.warn("Already caught up. No new data to process");
            return Pair.of(Option.empty(), queryInfo.getEndInstant());
        }
        return this.cloudDataFetcher.fetchPartitionedSource(CloudObjectsSelectorCommon.Type.S3, cloudObjectIncrCheckpoint, this.sourceProfileSupplier, this.queryRunner.run(queryInfo, this.snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit);
    }
}

