/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io.compact;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.io.storage.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableFileSystemView;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.compact.HoodieCompactor;
import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;

public class HoodieMergeOnReadTableCompactor
implements HoodieCompactor {
    private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTableCompactor.class);
    private AccumulatorV2<Long, Long> totalLogFiles;
    private AccumulatorV2<Long, Long> totalFileSlices;

    @Override
    public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, HoodieCompactionPlan compactionPlan, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
        if (compactionPlan == null || compactionPlan.getOperations() == null || compactionPlan.getOperations().isEmpty()) {
            return jsc.emptyRDD();
        }
        HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
        HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
        List operations = compactionPlan.getOperations().stream().map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
        LOG.info((Object)("Compactor compacting " + operations + " files"));
        return jsc.parallelize(operations, operations.size()).map((Function & Serializable)s -> this.compact(table, metaClient, config, (CompactionOperation)s, compactionInstantTime)).flatMap(List::iterator);
    }

    private List<WriteStatus> compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String commitTime) throws IOException {
        HoodieWrapperFileSystem fs = metaClient.getFs();
        Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
        LOG.info((Object)("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + " for commit " + commitTime));
        String maxInstantTime = metaClient.getActiveTimeline().getTimelineOfActions(Sets.newHashSet((Object[])new String[]{"commit", "rollback", "deltacommit"})).filterCompletedInstants().lastInstant().get().getTimestamp();
        LOG.info((Object)("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()));
        List<String> logFiles = operation.getDeltaFileNames().stream().map(p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles, readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), config.getSpillableMapBasePath());
        if (!scanner.iterator().hasNext()) {
            return Lists.newArrayList();
        }
        Option<HoodieBaseFile> oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
        Iterator<List<WriteStatus>> result = oldDataFileOpt.isPresent() ? hoodieCopyOnWriteTable.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(), oldDataFileOpt.get()) : hoodieCopyOnWriteTable.handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(), scanner.iterator());
        Iterable resultIterable = () -> result;
        return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
            s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
            s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
            s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
            s.getStat().setPartitionPath(operation.getPartitionPath());
            s.getStat().setTotalLogSizeCompacted(operation.getMetrics().get("TOTAL_LOG_FILES_SIZE").longValue());
            s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
            s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
            s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
            s.getStat().setRuntimeStats(runtimeStats);
        }).collect(Collectors.toList());
    }

    @Override
    public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime, Set<HoodieFileGroupId> fgIdsInPendingCompactions) throws IOException {
        this.totalLogFiles = new LongAccumulator();
        this.totalFileSlices = new LongAccumulator();
        jsc.sc().register(this.totalLogFiles);
        jsc.sc().register(this.totalFileSlices);
        Preconditions.checkArgument((hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ ? 1 : 0) != 0, (Object)("Can only compact table of type " + (Object)((Object)HoodieTableType.MERGE_ON_READ) + " and not " + hoodieTable.getMetaClient().getTableType().name()));
        HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
        LOG.info((Object)("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime));
        List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning());
        partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
        if (partitionPaths.isEmpty()) {
            return null;
        }
        TableFileSystemView.SliceView fileSystemView = hoodieTable.getSliceView();
        LOG.info((Object)("Compaction looking for files to compact in " + partitionPaths + " partitions"));
        List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size()).flatMap((FlatMapFunction & Serializable)partitionPath -> fileSystemView.getLatestFileSlices((String)partitionPath).filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())).map(s -> {
            List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
            this.totalLogFiles.add((Object)logFiles.size());
            this.totalFileSlices.add((Object)1L);
            Option<HoodieBaseFile> dataFile = s.getBaseFile();
            return new CompactionOperation(dataFile, (String)partitionPath, logFiles, config.getCompactionStrategy().captureMetrics(config, dataFile, (String)partitionPath, logFiles));
        }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(Collectors.toList()).iterator()).collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(Collectors.toList());
        LOG.info((Object)("Total of " + operations.size() + " compactions are retrieved"));
        LOG.info((Object)("Total number of latest files slices " + this.totalFileSlices.value()));
        LOG.info((Object)("Total number of log files " + this.totalLogFiles.value()));
        LOG.info((Object)("Total number of file slices " + this.totalFileSlices.value()));
        HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(Collectors.toList()));
        Preconditions.checkArgument((boolean)compactionPlan.getOperations().stream().noneMatch(op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), (Object)("Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions + ", Selected workload :" + (Object)((Object)compactionPlan)));
        if (compactionPlan.getOperations().isEmpty()) {
            LOG.warn((Object)("After filtering, Nothing to compact for " + metaClient.getBasePath()));
        }
        return compactionPlan;
    }
}

