/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Serializable;

public class IncrementalInputSplits
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
    private final Configuration conf;
    private final Path path;
    private final long maxCompactionMemoryInBytes;
    private final Set<String> requiredPartitions;
    private final boolean skipCompaction;

    private IncrementalInputSplits(Configuration conf, Path path, long maxCompactionMemoryInBytes, @Nullable Set<String> requiredPartitions, boolean skipCompaction) {
        this.conf = conf;
        this.path = path;
        this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
        this.requiredPartitions = requiredPartitions;
        this.skipCompaction = skipCompaction;
    }

    public static Builder builder() {
        return new Builder();
    }

    public Result inputSplits(HoodieTableMetaClient metaClient, org.apache.hadoop.conf.Configuration hadoopConf) {
        return this.inputSplits(metaClient, hadoopConf, null);
    }

    public Result inputSplits(HoodieTableMetaClient metaClient, org.apache.hadoop.conf.Configuration hadoopConf, String issuedInstant) {
        FileStatus[] fileStatuses;
        Set<String> writePartitions;
        InstantRange instantRange;
        HoodieInstant instantToIssue;
        metaClient.reloadActiveTimeline();
        HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
        if (commitTimeline.empty()) {
            LOG.warn("No splits found for the table under path " + this.path);
            return Result.EMPTY;
        }
        List<HoodieInstant> instants = this.filterInstantsWithRange(commitTimeline, issuedInstant);
        HoodieInstant hoodieInstant = instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
        if (instantToIssue != null) {
            String startCommit;
            instantRange = issuedInstant != null ? InstantRange.getInstance((String)issuedInstant, (String)instantToIssue.getTimestamp(), (InstantRange.RangeType)InstantRange.RangeType.OPEN_CLOSE) : (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() ? ((startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT)).equalsIgnoreCase("earliest") ? null : InstantRange.getInstance((String)startCommit, (String)instantToIssue.getTimestamp(), (InstantRange.RangeType)InstantRange.RangeType.CLOSE_CLOSE)) : InstantRange.getInstance((String)instantToIssue.getTimestamp(), (String)instantToIssue.getTimestamp(), (InstantRange.RangeType)InstantRange.RangeType.CLOSE_CLOSE));
        } else {
            LOG.info("No new instant found for the table under path " + this.path + ", skip reading");
            return Result.EMPTY;
        }
        String tableName = this.conf.getString(FlinkOptions.TABLE_NAME);
        if (instantRange == null) {
            FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(this.path.toUri()), this.conf);
            if (this.requiredPartitions != null) {
                fileIndex.setPartitionPaths(this.requiredPartitions);
            }
            if ((writePartitions = new HashSet<String>(fileIndex.getOrBuildPartitionPaths())).size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            fileStatuses = fileIndex.getFilesInPartitions();
        } else {
            List<HoodieCommitMetadata> activeMetadataList = instants.stream().map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, commitTimeline)).collect(Collectors.toList());
            List<HoodieCommitMetadata> archivedMetadataList = this.getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
            if (archivedMetadataList.size() > 0) {
                LOG.warn("\n--------------------------------------------------------------------------------\n---------- caution: the reader has fall behind too much from the writer,\n---------- tweak 'read.tasks' option to add parallelism of read tasks.\n--------------------------------------------------------------------------------");
            }
            List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0 ? IncrementalInputSplits.mergeList(archivedMetadataList, activeMetadataList) : activeMetadataList;
            writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
            if (this.requiredPartitions != null) {
                writePartitions = writePartitions.stream().filter(this.requiredPartitions::contains).collect(Collectors.toSet());
            }
            if (writePartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            fileStatuses = WriteProfiles.getWritePathsOfInstants(this.path, hadoopConf, metadataList, metaClient.getTableType());
        }
        if (fileStatuses.length == 0) {
            LOG.warn("No files found for reading in user provided path.");
            return Result.EMPTY;
        }
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
        String endInstant = instantToIssue.getTimestamp();
        AtomicInteger cnt = new AtomicInteger(0);
        String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
        List<MergeOnReadInputSplit> inputSplits = writePartitions.stream().map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant).map(fileSlice -> {
            Option logPaths = Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()));
            String basePath = (String)fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
            return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, (Option<List<String>>)logPaths, endInstant, metaClient.getBasePath(), this.maxCompactionMemoryInBytes, mergeType, instantRange);
        }).collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
        return Result.instance(inputSplits, endInstant);
    }

    private List<HoodieCommitMetadata> getArchivedMetadata(HoodieTableMetaClient metaClient, InstantRange instantRange, HoodieTimeline commitTimeline, String tableName) {
        HoodieArchivedTimeline archivedTimeline;
        HoodieTimeline archivedCompleteTimeline;
        if ((instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) && !(archivedCompleteTimeline = (archivedTimeline = metaClient.getArchivedTimeline()).getCommitsTimeline().filterCompletedInstants()).empty()) {
            String endTs = ((HoodieInstant)archivedCompleteTimeline.lastInstant().get()).getTimestamp();
            Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
            if (instantRange != null) {
                archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs);
                instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN_OR_EQUALS, (String)instantRange.getStartInstant()));
            } else {
                String startTs = ((HoodieInstant)archivedCompleteTimeline.firstInstant().get()).getTimestamp();
                archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
            }
            return this.maySkipCompaction(instantStream).map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, (HoodieTimeline)archivedTimeline)).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    private List<HoodieInstant> filterInstantsWithRange(HoodieTimeline commitTimeline, String issuedInstant) {
        HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
        if (issuedInstant != null) {
            return this.maySkipCompaction(completedTimeline.getInstants()).filter(s -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN, (String)issuedInstant)).collect(Collectors.toList());
        }
        Stream<HoodieInstant> instantStream = completedTimeline.getInstants();
        if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() && !((String)this.conf.get(FlinkOptions.READ_START_COMMIT)).equalsIgnoreCase("earliest")) {
            String startCommit = (String)this.conf.get(FlinkOptions.READ_START_COMMIT);
            instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN_OR_EQUALS, (String)startCommit));
        }
        if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
            String endCommit = (String)this.conf.get(FlinkOptions.READ_END_COMMIT);
            instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)endCommit));
        }
        return this.maySkipCompaction(instantStream).collect(Collectors.toList());
    }

    private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant> instants) {
        return this.skipCompaction ? instants.filter(instant -> !instant.getAction().equals("commit")) : instants;
    }

    private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
        ArrayList<T> merged = new ArrayList<T>(list1);
        merged.addAll(list2);
        return merged;
    }

    public static class Builder {
        private Configuration conf;
        private Path path;
        private long maxCompactionMemoryInBytes;
        private Set<String> requiredPartitions;
        private boolean skipCompaction = false;

        public Builder conf(Configuration conf) {
            this.conf = conf;
            return this;
        }

        public Builder path(Path path) {
            this.path = path;
            return this;
        }

        public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) {
            this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
            return this;
        }

        public Builder requiredPartitions(@Nullable Set<String> requiredPartitions) {
            this.requiredPartitions = requiredPartitions;
            return this;
        }

        public Builder skipCompaction(boolean skipCompaction) {
            this.skipCompaction = skipCompaction;
            return this;
        }

        public IncrementalInputSplits build() {
            return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction);
        }
    }

    public static class Result {
        private final List<MergeOnReadInputSplit> inputSplits;
        private final String endInstant;
        public static final Result EMPTY = Result.instance(Collections.emptyList(), "");

        public boolean isEmpty() {
            return this.inputSplits.size() == 0;
        }

        public List<MergeOnReadInputSplit> getInputSplits() {
            return this.inputSplits;
        }

        public String getEndInstant() {
            return this.endInstant;
        }

        private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant) {
            this.inputSplits = inputSplits;
            this.endInstant = endInstant;
        }

        public static Result instance(List<MergeOnReadInputSplit> inputSplits, String endInstant) {
            return new Result(inputSplits, endInstant);
        }
    }
}

