/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.func.MergeOnReadLazyInsertIterable;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.io.compact.HoodieMergeOnReadTableCompactor;
import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.RollbackExecutor;
import org.apache.hudi.table.RollbackRequest;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

public class HoodieMergeOnReadTable<T extends HoodieRecordPayload>
extends HoodieCopyOnWriteTable<T> {
    private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class);
    private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner;

    public HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc) {
        super(config, jsc);
    }

    @Override
    public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
        if (profile == null) {
            throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
        }
        this.mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile);
        return this.mergeOnReadUpsertPartitioner;
    }

    @Override
    public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException {
        LOG.info((Object)("Merging updates for commit " + commitTime + " for file " + fileId));
        if (!this.index.canIndexLogFiles() && this.mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
            LOG.info((Object)("Small file corrections for updates for commit " + commitTime + " for file " + fileId));
            return super.handleUpdate(commitTime, fileId, recordItr);
        }
        HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<T>(this.config, commitTime, this, fileId, recordItr);
        appendHandle.doAppend();
        appendHandle.close();
        return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
    }

    @Override
    public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception {
        if (this.index.canIndexLogFiles()) {
            return new MergeOnReadLazyInsertIterable<T>(recordItr, this.config, commitTime, this, idPfx);
        }
        return super.handleInsert(commitTime, idPfx, recordItr);
    }

    @Override
    public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
        LOG.info((Object)("Checking if compaction needs to be run on " + this.config.getBasePath()));
        Option<HoodieInstant> lastCompaction = this.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
        String deltaCommitsSinceTs = "0";
        if (lastCompaction.isPresent()) {
            deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
        }
        int deltaCommitsSinceLastCompaction = this.getActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
        if (this.config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
            LOG.info((Object)("Not running compaction as only " + deltaCommitsSinceLastCompaction + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for " + this.config.getInlineCompactDeltaCommitMax()));
            return new HoodieCompactionPlan();
        }
        LOG.info((Object)("Compacting merge on read table " + this.config.getBasePath()));
        HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
        try {
            return compactor.generateCompactionPlan(jsc, this, this.config, instantTime, ((SyncableFileSystemView)this.getSliceView()).getPendingCompactionOperations().map(instantTimeCompactionopPair -> ((CompactionOperation)instantTimeCompactionopPair.getValue()).getFileGroupId()).collect(Collectors.toSet()));
        }
        catch (IOException e) {
            throw new HoodieCompactionException("Could not schedule compaction " + this.config.getBasePath(), e);
        }
    }

    @Override
    public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime, HoodieCompactionPlan compactionPlan) {
        HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
        try {
            return compactor.compact(jsc, compactionPlan, this, this.config, compactionInstantTime);
        }
        catch (IOException e) {
            throw new HoodieCompactionException("Could not compact " + this.config.getBasePath(), e);
        }
    }

    @Override
    public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants) throws IOException {
        long startTime = System.currentTimeMillis();
        String commit = instant.getTimestamp();
        LOG.error((Object)("Rolling back instant " + instant));
        if (instant.isCompleted()) {
            LOG.error((Object)("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants));
            instant = this.getActiveTimeline().revertToInflight(instant);
        }
        List<HoodieRollbackStat> allRollbackStats = new ArrayList<HoodieRollbackStat>();
        if (!instant.isRequested()) {
            LOG.info((Object)("Unpublished " + commit));
            List<RollbackRequest> rollbackRequests = this.generateRollbackRequests(jsc, instant);
            allRollbackStats = new RollbackExecutor(this.metaClient, this.config).performRollback(jsc, instant, rollbackRequests);
        }
        this.deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant);
        LOG.info((Object)("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)));
        return allRollbackStats;
    }

    private List<RollbackRequest> generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback) throws IOException {
        String commit = instantToRollback.getTimestamp();
        List<String> partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), this.config.shouldAssumeDatePartitioning());
        int sparkPartitions = Math.max(Math.min(partitions.size(), this.config.getRollbackParallelism()), 1);
        return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap((FlatMapFunction & Serializable)partitionPath -> {
            HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
            ArrayList<RollbackRequest> partitionRollbackRequests = new ArrayList<RollbackRequest>();
            switch (instantToRollback.getAction()) {
                case "commit": {
                    LOG.info((Object)"Rolling back commit action. There are higher delta commits. So only rolling back this instant");
                    partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
                    break;
                }
                case "compaction": {
                    boolean higherDeltaCommits;
                    boolean bl = higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty();
                    if (higherDeltaCommits) {
                        LOG.info((Object)"Rolling back compaction. There are higher delta commits. So only deleting data files");
                        partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback));
                        break;
                    }
                    LOG.info((Object)"Rolling back compaction plan. There are NO higher delta commits. So deleting both data and log files");
                    partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
                    break;
                }
                case "deltacommit": {
                    try {
                        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(this.metaClient.getCommitTimeline().getInstantDetails(new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())).get(), HoodieCommitMetadata.class);
                        partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
                        if (!commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) break;
                        partitionRollbackRequests.addAll(this.generateAppendRollbackBlocksAction((String)partitionPath, instantToRollback, commitMetadata));
                        break;
                    }
                    catch (IOException io) {
                        throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
                    }
                }
            }
            return partitionRollbackRequests.iterator();
        }).filter(Objects::nonNull).collect();
    }

    @Override
    public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats) throws HoodieIOException {
        super.finalizeWrite(jsc, instantTs, stats);
    }

    private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, HoodieCommitMetadata commitMetadata) {
        Preconditions.checkArgument((boolean)rollbackInstant.getAction().equals("deltacommit"));
        Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
        return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
            boolean validForRollback;
            boolean bl = validForRollback = wStat != null && !wStat.getPrevCommit().equals("null") && wStat.getPrevCommit() != null && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
            if (validForRollback) {
                Preconditions.checkArgument((boolean)HoodieTimeline.compareTimestamps((String)fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
            }
            return validForRollback && HoodieTimeline.compareTimestamps((String)fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
        }).map(wStat -> {
            String baseCommitTime = (String)fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
            return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), baseCommitTime, rollbackInstant);
        }).collect(Collectors.toList());
    }

    class MergeOnReadUpsertPartitioner
    extends HoodieCopyOnWriteTable.UpsertPartitioner {
        MergeOnReadUpsertPartitioner(WorkloadProfile profile) {
            super(profile);
        }

        @Override
        protected List<HoodieCopyOnWriteTable.SmallFile> getSmallFiles(String partitionPath) {
            ArrayList<HoodieCopyOnWriteTable.SmallFile> smallFileLocations = new ArrayList<HoodieCopyOnWriteTable.SmallFile>();
            HoodieTimeline commitTimeline = HoodieMergeOnReadTable.this.getCompletedCommitsTimeline();
            if (!commitTimeline.empty()) {
                HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
                ArrayList<FileSlice> allSmallFileSlices = new ArrayList<FileSlice>();
                if (!HoodieMergeOnReadTable.this.index.canIndexLogFiles()) {
                    Option<FileSlice> smallFileSlice = Option.fromJavaOptional(HoodieMergeOnReadTable.this.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false).filter(fileSlice -> fileSlice.getLogFiles().count() < 1L && fileSlice.getBaseFile().get().getFileSize() < (long)HoodieMergeOnReadTable.this.config.getParquetSmallFileLimit()).min((left, right) -> left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
                    if (smallFileSlice.isPresent()) {
                        allSmallFileSlices.add(smallFileSlice.get());
                    }
                } else {
                    List allFileSlices = HoodieMergeOnReadTable.this.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true).collect(Collectors.toList());
                    for (FileSlice fileSlice2 : allFileSlices) {
                        if (!this.isSmallFile(fileSlice2)) continue;
                        allSmallFileSlices.add(fileSlice2);
                    }
                }
                for (FileSlice smallFileSlice : allSmallFileSlices) {
                    HoodieCopyOnWriteTable.SmallFile sf = new HoodieCopyOnWriteTable.SmallFile();
                    if (smallFileSlice.getBaseFile().isPresent()) {
                        String filename = smallFileSlice.getBaseFile().get().getFileName();
                        sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
                        sf.sizeBytes = this.getTotalFileSize(smallFileSlice);
                        smallFileLocations.add(sf);
                        this.smallFiles.add(sf);
                        continue;
                    }
                    HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
                    sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), FSUtils.getFileIdFromLogPath(logFile.getPath()));
                    sf.sizeBytes = this.getTotalFileSize(smallFileSlice);
                    smallFileLocations.add(sf);
                    this.smallFiles.add(sf);
                }
            }
            return smallFileLocations;
        }

        public List<String> getSmallFileIds() {
            return this.smallFiles.stream().map(smallFile -> ((HoodieCopyOnWriteTable.SmallFile)smallFile).location.getFileId()).collect(Collectors.toList());
        }

        private long getTotalFileSize(FileSlice fileSlice) {
            if (!fileSlice.getBaseFile().isPresent()) {
                return this.convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
            }
            return fileSlice.getBaseFile().get().getFileSize() + this.convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
        }

        private boolean isSmallFile(FileSlice fileSlice) {
            long totalSize = this.getTotalFileSize(fileSlice);
            return totalSize < HoodieMergeOnReadTable.this.config.getParquetMaxFileSize();
        }

        @VisibleForTesting
        public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
            long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size > 0L).reduce(Long::sum).orElse(0L);
            return (long)((double)totalSizeOfLogFiles * HoodieMergeOnReadTable.this.config.getLogFileToParquetCompressionRatio());
        }
    }
}

