/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineLayout;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieIncrSource
extends RowSource {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieIncrSource.class);
    public static final Set<String> HOODIE_INCR_SOURCE_READ_OPT_KEYS = CollectionUtils.createImmutableSet(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key());
    private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
    private final Option<HoodieIngestionMetrics> metricsOption;
    private final Map<String, String> readOpts = new HashMap<String, String>();

    public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, StreamContext streamContext) {
        this(props, sparkContext, sparkSession, (HoodieIngestionMetrics)null, streamContext);
    }

    public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metricsOption, StreamContext streamContext) {
        super(props, sparkContext, sparkSession, streamContext);
        for (Object key : props.keySet()) {
            String keyString = key.toString();
            if (!HOODIE_INCR_SOURCE_READ_OPT_KEYS.contains(keyString)) continue;
            this.readOpts.put(keyString, props.getString(key.toString()));
        }
        this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props);
        this.metricsOption = Option.ofNullable(metricsOption);
    }

    @Override
    protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoint) {
        return lastCheckpoint;
    }

    @Override
    public Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        if (CheckpointUtils.targetCheckpointV2(this.writeTableVersion)) {
            return this.fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit);
        }
        return this.fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit);
    }

    private Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatchBasedOnCompletionTime(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        Dataset source;
        boolean shouldFullScan;
        String endCompletionTime;
        IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
        ConfigUtils.checkRequiredConfigProperties(this.props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
        String srcPath = ConfigUtils.getStringWithAltKeys(this.props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
        boolean readLatestOnMissingCkpt = ConfigUtils.getBooleanWithAltKeys(this.props, HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT);
        IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy2 = missingCheckpointStrategy = ConfigUtils.containsConfigProperty(this.props, HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY) ? IncrSourceHelper.MissingCheckpointStrategy.valueOf(ConfigUtils.getStringWithAltKeys(this.props, HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY)) : null;
        if (readLatestOnMissingCkpt) {
            missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
        }
        IncrementalQueryAnalyzer analyzer = IncrSourceHelper.getIncrementalQueryAnalyzer(this.sparkContext, srcPath, lastCheckpoint, missingCheckpointStrategy, ConfigUtils.getIntWithAltKeys(this.props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH), this.getLatestSourceProfile());
        IncrementalQueryAnalyzer.QueryContext queryContext = analyzer.analyze();
        Option<InstantRange> instantRange = queryContext.getInstantRange();
        if (queryContext.isEmpty() || (endCompletionTime = queryContext.getMaxCompletionTime()).equals(analyzer.getStartCompletionTime().orElseGet(() -> null))) {
            LOG.info("Already caught up. No new data to process");
            return Pair.of(Option.empty(), lastCheckpoint.orElse(null));
        }
        DataFrameReader reader = this.sparkSession.read().format("hudi");
        String datasourceOpts = ConfigUtils.getStringWithAltKeys((Properties)this.props, HoodieIncrSourceConfig.HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS, true);
        if (!StringUtils.isNullOrEmpty(datasourceOpts)) {
            Map<String, String> optionsMap = Arrays.stream(datasourceOpts.split(",")).map(option -> Pair.of(option.split("=")[0], option.split("=")[1])).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
            reader = reader.options(optionsMap);
        }
        boolean bl = shouldFullScan = missingCheckpointStrategy == IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT && queryContext.getActiveTimeline().isBeforeTimelineStartsByCompletionTime(analyzer.getStartCompletionTime().get());
        if (instantRange.isEmpty() || shouldFullScan) {
            Dataset snapshot = reader.options(this.readOpts).option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath);
            Option<String> predicate = Option.empty();
            List<String> instantTimeList = queryContext.getInstantTimeList();
            if (this.snapshotLoadQuerySplitter.isPresent()) {
                Option<SnapshotLoadQuerySplitter.CheckpointWithPredicates> newCheckpointAndPredicate = this.snapshotLoadQuerySplitter.get().getNextCheckpointWithPredicates((Dataset<Row>)snapshot, queryContext);
                if (newCheckpointAndPredicate.isPresent()) {
                    endCompletionTime = newCheckpointAndPredicate.get().endCompletionTime;
                    predicate = Option.of(newCheckpointAndPredicate.get().predicateFilter);
                    instantTimeList = queryContext.getInstants().stream().filter(instant -> InstantComparison.compareTimestamps(instant.getCompletionTime(), InstantComparison.LESSER_THAN_OR_EQUALS, ((SnapshotLoadQuerySplitter.CheckpointWithPredicates)newCheckpointAndPredicate.get()).endCompletionTime)).map(HoodieInstant::requestedTime).collect(Collectors.toList());
                } else {
                    endCompletionTime = queryContext.getMaxCompletionTime();
                }
            }
            snapshot = predicate.map(arg_0 -> ((Dataset)snapshot).filter(arg_0)).orElse(snapshot);
            source = snapshot.filter(String.format("%s IN ('%s')", HoodieRecord.COMMIT_TIME_METADATA_FIELD, String.join((CharSequence)"','", instantTimeList)));
        } else {
            TimelineLayout layout = TimelineLayout.fromVersion(queryContext.getActiveTimeline().getTimelineLayoutVersion());
            String inclusiveStartCompletionTime = queryContext.getInstants().stream().min(layout.getInstantComparator().completionTimeOrderedComparator()).map(HoodieInstant::getCompletionTime).get();
            source = reader.options(this.readOpts).option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions.INCREMENTAL_READ_TABLE_VERSION().key(), (long)HoodieTableVersion.EIGHT.versionCode()).option(DataSourceReadOptions.START_COMMIT().key(), inclusiveStartCompletionTime).option(DataSourceReadOptions.END_COMMIT().key(), endCompletionTime).option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), this.props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), (String)DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue())).load(srcPath);
        }
        HoodieRecord.HoodieRecordType recordType = UtilHelpers.createRecordMerger(this.props).getRecordType();
        boolean shouldDropMetaFields = ConfigUtils.getBooleanWithAltKeys(this.props, HoodieIncrSourceConfig.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE) || recordType == HoodieRecord.HoodieRecordType.SPARK;
        String[] colsToDrop = shouldDropMetaFields ? (String[])HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) : (String[])HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
        Dataset sourceWithMetaColumnsDropped = source.drop(colsToDrop);
        Dataset src = this.getLatestSourceProfile().map(sourceProfile -> {
            this.metricsOption.ifPresent(metrics -> metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(sourceProfile.getMaxSourceBytes()));
            this.metricsOption.ifPresent(metrics -> metrics.updateStreamerSourceParallelism(sourceProfile.getSourcePartitions()));
            return IncrSourceHelper.coalesceOrRepartition(sourceWithMetaColumnsDropped, sourceProfile.getSourcePartitions());
        }).orElse(sourceWithMetaColumnsDropped);
        return Pair.of(Option.of(src), new StreamerCheckpointV2(endCompletionTime));
    }

    private Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatchBasedOnRequestedTime(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        Dataset source;
        IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
        ConfigUtils.checkRequiredConfigProperties(this.props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
        String srcPath = ConfigUtils.getStringWithAltKeys(this.props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
        boolean readLatestOnMissingCkpt = ConfigUtils.getBooleanWithAltKeys(this.props, HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT);
        IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy2 = missingCheckpointStrategy = ConfigUtils.containsConfigProperty(this.props, HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY) ? IncrSourceHelper.MissingCheckpointStrategy.valueOf(ConfigUtils.getStringWithAltKeys(this.props, HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY)) : null;
        if (readLatestOnMissingCkpt) {
            missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
        }
        Option<Checkpoint> beginInstant = lastCheckpoint.isPresent() ? (lastCheckpoint.get().getCheckpointKey().isEmpty() ? Option.empty() : lastCheckpoint) : Option.empty();
        int numInstantsFromConfig = ConfigUtils.getIntWithAltKeys(this.props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH);
        int numInstantsPerFetch = this.getLatestSourceProfile().map(sourceProfile -> {
            int numInstantsFromSourceProfile = (Integer)sourceProfile.getSourceSpecificContext();
            LOG.info("Overriding numInstantsPerFetch from source profile numInstantsFromSourceProfile {} , numInstantsFromConfig {}", (Object)numInstantsFromSourceProfile, (Object)numInstantsFromConfig);
            return numInstantsFromSourceProfile;
        }).orElse(numInstantsFromConfig);
        TimelineUtils.HollowCommitHandling handlingMode = IncrSourceHelper.getHollowCommitHandleMode(this.props);
        QueryInfo queryInfo = IncrSourceHelper.generateQueryInfo(this.sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy, handlingMode, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, null, false, Option.empty());
        if (queryInfo.areStartAndEndInstantsEqual()) {
            LOG.info("Already caught up. No new data to process");
            return Pair.of(Option.empty(), new StreamerCheckpointV1(queryInfo.getEndInstant()));
        }
        DataFrameReader reader = this.sparkSession.read().format("hudi");
        String datasourceOpts = ConfigUtils.getStringWithAltKeys((Properties)this.props, HoodieIncrSourceConfig.HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS, true);
        if (!StringUtils.isNullOrEmpty(datasourceOpts)) {
            Map<String, String> optionsMap = Arrays.stream(datasourceOpts.split(",")).map(option -> Pair.of(option.split("=")[0], option.split("=")[1])).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
            reader = reader.options(optionsMap);
        }
        if (queryInfo.isIncremental()) {
            source = reader.options(this.readOpts).option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions.INCREMENTAL_READ_TABLE_VERSION().key(), (long)HoodieTableVersion.SIX.versionCode()).option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getStartInstant()).option(DataSourceReadOptions.END_COMMIT().key(), queryInfo.getEndInstant()).option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), this.props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), (String)DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue())).option(DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key(), handlingMode.name()).load(srcPath);
        } else {
            Dataset snapshot = reader.options(this.readOpts).option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath);
            if (this.snapshotLoadQuerySplitter.isPresent()) {
                queryInfo = this.snapshotLoadQuerySplitter.get().getNextCheckpoint((Dataset<Row>)snapshot, queryInfo, this.sourceProfileSupplier);
            }
            source = snapshot.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryInfo.getStartInstant())).filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryInfo.getEndInstant()));
            source = queryInfo.getPredicateFilter().map(arg_0 -> ((Dataset)source).filter(arg_0)).orElse(source);
        }
        HoodieRecord.HoodieRecordType recordType = UtilHelpers.createRecordMerger(this.props).getRecordType();
        boolean shouldDropMetaFields = ConfigUtils.getBooleanWithAltKeys(this.props, HoodieIncrSourceConfig.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE) || recordType == HoodieRecord.HoodieRecordType.SPARK;
        String[] colsToDrop = shouldDropMetaFields ? (String[])HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) : (String[])HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
        Dataset sourceWithMetaColumnsDropped = source.drop(colsToDrop);
        Dataset src = this.getLatestSourceProfile().map(sourceProfile -> {
            this.metricsOption.ifPresent(metrics -> metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(sourceProfile.getMaxSourceBytes()));
            this.metricsOption.ifPresent(metrics -> metrics.updateStreamerSourceParallelism(sourceProfile.getSourcePartitions()));
            return IncrSourceHelper.coalesceOrRepartition(sourceWithMetaColumnsDropped, sourceProfile.getSourcePartitions());
        }).orElse(sourceWithMetaColumnsDropped);
        return Pair.of(Option.of(src), new StreamerCheckpointV1(queryInfo.getEndInstant()));
    }

    private Option<SourceProfile<Integer>> getLatestSourceProfile() {
        return this.sourceProfileSupplier.map(SourceProfileSupplier::getSourceProfile);
    }

    public static class Config {
        @Deprecated
        public static final String HOODIE_SRC_BASE_PATH = HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key();
        @Deprecated
        static final String NUM_INSTANTS_PER_FETCH = HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH.key();
        @Deprecated
        static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH.defaultValue();
        @Deprecated
        static final String HOODIE_SRC_PARTITION_FIELDS = HoodieIncrSourceConfig.HOODIE_SRC_PARTITION_FIELDS.key();
        @Deprecated
        static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS = HoodieIncrSourceConfig.HOODIE_SRC_PARTITION_EXTRACTORCLASS.key();
        @Deprecated
        static final String DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS = HoodieIncrSourceConfig.HOODIE_SRC_PARTITION_EXTRACTORCLASS.defaultValue();
        @Deprecated
        public static final String READ_LATEST_INSTANT_ON_MISSING_CKPT = HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT.key();
        @Deprecated
        public static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT.defaultValue();
        @Deprecated
        public static final String MISSING_CHECKPOINT_STRATEGY = HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY.key();
        @Deprecated
        static final String SOURCE_FILE_FORMAT = HoodieIncrSourceConfig.SOURCE_FILE_FORMAT.key();
        @Deprecated
        static final String DEFAULT_SOURCE_FILE_FORMAT = HoodieIncrSourceConfig.SOURCE_FILE_FORMAT.defaultValue();
        @Deprecated
        static final String HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = HoodieIncrSourceConfig.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE.key();
        @Deprecated
        public static final Boolean DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = HoodieIncrSourceConfig.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE.defaultValue();
    }
}

