/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Properties;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.functions;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrSourceHelper {
    private static final Logger LOG = LoggerFactory.getLogger(IncrSourceHelper.class);
    public static final String DEFAULT_START_TIMESTAMP = "00000000000000";
    private static final String CUMULATIVE_COLUMN_NAME = "cumulativeSize";

    public static TimelineUtils.HollowCommitHandling getHollowCommitHandleMode(TypedProperties props) {
        return TimelineUtils.HollowCommitHandling.valueOf((String)props.getString(DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key(), TimelineUtils.HollowCommitHandling.BLOCK.name()));
    }

    public static QueryInfo generateQueryInfo(JavaSparkContext jssc, String srcBasePath, int numInstantsPerFetch, Option<Checkpoint> beginInstant, MissingCheckpointStrategy missingCheckpointStrategy, TimelineUtils.HollowCommitHandling handlingMode, String orderColumn, String keyColumn, String limitColumn, boolean sourceLimitBasedBatching, Option<String> lastCheckpointKey) {
        ValidationUtils.checkArgument((numInstantsPerFetch > 0 ? 1 : 0) != 0, (String)"Make sure the config hoodie.streamer.source.hoodieincr.num_instants is set to a positive value");
        HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy((Configuration)jssc.hadoopConfiguration())).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
        HoodieTimeline completedCommitTimeline = srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
        HoodieTimeline activeCommitTimeline = TimelineUtils.handleHollowCommitIfNeeded((HoodieTimeline)completedCommitTimeline, (HoodieTableMetaClient)srcMetaClient, (TimelineUtils.HollowCommitHandling)handlingMode);
        Function<HoodieInstant, String> timestampForLastInstant = instant -> handlingMode == TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME ? instant.getCompletionTime() : instant.requestedTime();
        Option translatedCheckpoint = beginInstant.isPresent() ? Option.of((Object)CheckpointUtils.convertToCheckpointV1ForCommitTime((Checkpoint)((Checkpoint)beginInstant.get()), (HoodieTableMetaClient)srcMetaClient)) : Option.empty();
        String beginInstantTime = ((Checkpoint)translatedCheckpoint.orElseGet(() -> {
            if (missingCheckpointStrategy != null) {
                if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST) {
                    Option lastInstant = activeCommitTimeline.lastInstant();
                    return new StreamerCheckpointV1((String)lastInstant.map(hoodieInstant -> HoodieInstantTimeGenerator.instantTimeMinusMillis((String)((String)timestampForLastInstant.apply((HoodieInstant)hoodieInstant)), (long)1L)).orElse((Object)DEFAULT_START_TIMESTAMP));
                }
                return new StreamerCheckpointV1(DEFAULT_START_TIMESTAMP);
            }
            throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest committed instant set hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy to a valid value");
        })).getCheckpointKey();
        String previousInstantTime = DEFAULT_START_TIMESTAMP;
        if (!beginInstantTime.equals(DEFAULT_START_TIMESTAMP)) {
            Option previousInstant = activeCommitTimeline.findInstantBefore(beginInstantTime);
            if (previousInstant.isPresent()) {
                previousInstantTime = ((HoodieInstant)previousInstant.get()).requestedTime();
            } else if (activeCommitTimeline.filterCompletedInstants().firstInstant().isPresent() && ((HoodieInstant)activeCommitTimeline.filterCompletedInstants().firstInstant().get()).requestedTime().equals(beginInstantTime)) {
                previousInstantTime = String.valueOf(Long.parseLong(beginInstantTime) - 1L);
            }
        }
        if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST || !activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
            Option nthInstant = sourceLimitBasedBatching && lastCheckpointKey.isPresent() ? Option.fromJavaOptional(activeCommitTimeline.findInstantsAfterOrEquals(beginInstantTime, numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y)) : Option.fromJavaOptional(activeCommitTimeline.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
            return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), previousInstantTime, beginInstantTime, (String)nthInstant.map(HoodieInstant::requestedTime).orElse((Object)beginInstantTime), orderColumn, keyColumn, limitColumn);
        }
        Option lastInstant = activeCommitTimeline.lastInstant();
        return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), previousInstantTime, beginInstantTime, ((HoodieInstant)lastInstant.get()).requestedTime(), orderColumn, keyColumn, limitColumn);
    }

    public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer(JavaSparkContext jssc, String srcPath, Option<Checkpoint> lastCheckpoint, MissingCheckpointStrategy missingCheckpointStrategy, int numInstantsFromConfig, Option<SourceProfile<Integer>> latestSourceProfile) {
        InstantRange.RangeType rangeType;
        String startCompletionTime;
        HoodieTableMetaClient metaClient;
        block5: {
            block6: {
                block4: {
                    metaClient = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy((Configuration)jssc.hadoopConfiguration())).setBasePath(srcPath).setLoadActiveTimelineOnLoad(true).build();
                    if (!lastCheckpoint.isPresent() || ((Checkpoint)lastCheckpoint.get()).getCheckpointKey().isEmpty()) break block4;
                    StreamerCheckpointV2 lastStreamerCheckpointV2 = CheckpointUtils.convertToCheckpointV2ForCommitTime((Checkpoint)((Checkpoint)lastCheckpoint.get()), (HoodieTableMetaClient)metaClient);
                    startCompletionTime = lastStreamerCheckpointV2.getCheckpointKey();
                    rangeType = InstantRange.RangeType.OPEN_CLOSED;
                    break block5;
                }
                if (missingCheckpointStrategy == null) break block6;
                rangeType = InstantRange.RangeType.CLOSED_CLOSED;
                switch (missingCheckpointStrategy) {
                    case READ_UPTO_LATEST_COMMIT: {
                        startCompletionTime = DEFAULT_START_TIMESTAMP;
                        numInstantsFromConfig = -1;
                        break block5;
                    }
                    case READ_LATEST: {
                        startCompletionTime = null;
                        break block5;
                    }
                    default: {
                        throw new IllegalArgumentException("Unknown missing checkpoint strategy: " + (Object)((Object)missingCheckpointStrategy));
                    }
                }
            }
            throw new IllegalArgumentException("Missing start completion time for incremental pull. For reading from latest committed instant, set " + HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY.key() + " to a valid value");
        }
        int numInstantsFromConfigFinal = numInstantsFromConfig;
        int numInstantsPerFetch = (Integer)latestSourceProfile.map(sourceProfile -> {
            int numInstantsFromSourceProfile = (Integer)sourceProfile.getSourceSpecificContext();
            LOG.info("Overriding numInstantsPerFetch from source profile numInstantsFromSourceProfile {} , numInstantsFromConfig {}", (Object)numInstantsFromSourceProfile, (Object)numInstantsFromConfigFinal);
            return numInstantsFromSourceProfile;
        }).orElse((Object)numInstantsFromConfig);
        return IncrementalQueryAnalyzer.builder().metaClient(metaClient).startCompletionTime(startCompletionTime).endCompletionTime(null).rangeType(rangeType).limit(numInstantsPerFetch).build();
    }

    public static Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData, long sourceLimit, QueryInfo queryInfo, CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint) {
        if (sourceData.isEmpty()) {
            CloudObjectIncrCheckpoint updatedCheckpoint = queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit()) ? cloudObjectIncrCheckpoint : new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null);
            return Pair.of((Object)updatedCheckpoint, (Object)Option.empty());
        }
        sourceData.persist(StorageLevel.MEMORY_AND_DISK());
        Dataset orderedDf = QueryRunner.applyOrdering(sourceData, queryInfo.getOrderByColumns());
        Option lastCheckpoint = Option.of((Object)cloudObjectIncrCheckpoint.getCommit());
        Option lastCheckpointKey = Option.ofNullable((Object)cloudObjectIncrCheckpoint.getKey());
        Option concatenatedKey = lastCheckpoint.flatMap(checkpoint -> lastCheckpointKey.map(key -> checkpoint + key));
        if (concatenatedKey.isPresent()) {
            orderedDf = orderedDf.withColumn("commit_key", functions.concat((Column[])new Column[]{functions.col((String)queryInfo.getOrderColumn()), functions.col((String)queryInfo.getKeyColumn())}));
            if ((orderedDf = orderedDf.filter(functions.col((String)"commit_key").gt(concatenatedKey.get())).drop("commit_key")).isEmpty()) {
                LOG.info("Empty ordered source, returning endpoint:" + queryInfo.getEndInstant());
                sourceData.unpersist();
                CloudObjectIncrCheckpoint updatedCheckpoint = queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit()) ? cloudObjectIncrCheckpoint : new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null);
                return Pair.of((Object)updatedCheckpoint, (Object)Option.empty());
            }
        }
        WindowSpec windowSpec = Window.orderBy((Column[])new Column[]{functions.col((String)queryInfo.getOrderColumn()), functions.col((String)queryInfo.getKeyColumn())});
        Dataset aggregatedData = orderedDf.withColumn(CUMULATIVE_COLUMN_NAME, functions.sum((Column)functions.col((String)queryInfo.getLimitColumn())).over(windowSpec));
        Dataset collectedRows = aggregatedData.filter(functions.col((String)CUMULATIVE_COLUMN_NAME).leq((Object)sourceLimit));
        Row row = null;
        if (collectedRows.isEmpty()) {
            LOG.info("First object exceeding source limit: " + sourceLimit + " bytes");
            row = (Row)aggregatedData.select(queryInfo.getOrderColumn(), new String[]{queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME}).first();
            collectedRows = aggregatedData.limit(1);
        } else {
            row = (Row)collectedRows.select(queryInfo.getOrderColumn(), new String[]{queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME}).orderBy(new Column[]{functions.col((String)queryInfo.getOrderColumn()).desc(), functions.col((String)queryInfo.getKeyColumn()).desc()}).first();
        }
        LOG.info("Processed batch size: " + row.get(row.fieldIndex(CUMULATIVE_COLUMN_NAME)) + " bytes");
        sourceData.unpersist();
        return Pair.of((Object)new CloudObjectIncrCheckpoint(row.getString(0), row.getString(1)), (Object)Option.of((Object)collectedRows));
    }

    public static MissingCheckpointStrategy getMissingCheckpointStrategy(TypedProperties props) {
        boolean readLatestOnMissingCkpt = ConfigUtils.getBooleanWithAltKeys((Properties)props, HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT);
        if (readLatestOnMissingCkpt) {
            return MissingCheckpointStrategy.READ_LATEST;
        }
        if (ConfigUtils.containsConfigProperty((TypedProperties)props, HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY)) {
            return MissingCheckpointStrategy.valueOf(ConfigUtils.getStringWithAltKeys((Properties)props, HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY));
        }
        return null;
    }

    public static Dataset<Row> coalesceOrRepartition(Dataset dataset, int numPartitions) {
        int existingNumPartitions = dataset.rdd().getNumPartitions();
        LOG.info(String.format("existing number of partitions=%d, required number of partitions=%d", existingNumPartitions, numPartitions));
        dataset = existingNumPartitions < numPartitions ? dataset.repartition(numPartitions) : dataset.coalesce(numPartitions);
        return dataset;
    }

    public static enum MissingCheckpointStrategy {
        READ_LATEST("Read from latest commit in hoodie source table"),
        READ_UPTO_LATEST_COMMIT("Read everything upto latest commit");

        private final String description;

        private MissingCheckpointStrategy(String description) {
            this.description = description;
        }

        public String getDescription() {
            return this.description;
        }

        private static MissingCheckpointStrategy nullEnum() {
            return null;
        }

        public String toString() {
            return String.format("%s (%s)", this.name(), this.description);
        }
    }
}

