/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.compact;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.HoodieCompactor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;

public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
implements HoodieCompactor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
    private static final Logger LOG = LogManager.getLogger(HoodieSparkMergeOnReadTableCompactor.class);
    private AccumulatorV2<Long, Long> totalLogFiles;
    private AccumulatorV2<Long, Long> totalFileSlices;

    @Override
    public JavaRDD<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
        JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
        if (compactionPlan == null || compactionPlan.getOperations() == null || compactionPlan.getOperations().isEmpty()) {
            return jsc.emptyRDD();
        }
        HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
        HoodieSparkCopyOnWriteTable table = new HoodieSparkCopyOnWriteTable(config, context, metaClient);
        List operations = compactionPlan.getOperations().stream().map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
        LOG.info((Object)("Compactor compacting " + operations + " files"));
        context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices");
        return jsc.parallelize(operations, operations.size()).map((Function & Serializable)s -> this.compact(table, metaClient, config, (CompactionOperation)s, compactionInstantTime)).flatMap(List::iterator);
    }

    private List<WriteStatus> compact(HoodieSparkCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String instantTime) throws IOException {
        HoodieWrapperFileSystem fs = metaClient.getFs();
        Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
        LOG.info((Object)("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + " for commit " + instantTime));
        String maxInstantTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet("commit", "rollback", "deltacommit")).filterCompletedInstants().lastInstant().get().getTimestamp();
        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config.getProps());
        LOG.info((Object)("MaxMemoryPerCompaction => " + maxMemoryPerCompaction));
        List logFiles = operation.getDeltaFileNames().stream().map(p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withFileSystem(fs).withBasePath(metaClient.getBasePath()).withLogFilePaths(logFiles)).withReaderSchema(readerSchema).withLatestInstantTime(maxInstantTime).withMaxMemorySizeInBytes(maxMemoryPerCompaction).withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()).withReverseReader(config.getCompactionReverseLogReadEnabled()).withBufferSize(config.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(config.getSpillableMapBasePath()).build();
        if (!scanner.iterator().hasNext()) {
            return new ArrayList<WriteStatus>();
        }
        Option<HoodieBaseFile> oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
        Iterator<List<WriteStatus>> result = oldDataFileOpt.isPresent() ? hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(), operation.getFileId(), scanner.getRecords(), oldDataFileOpt.get()) : hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(), scanner.getRecords());
        Iterable resultIterable = () -> result;
        return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
            s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
            s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
            s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
            s.getStat().setPartitionPath(operation.getPartitionPath());
            s.getStat().setTotalLogSizeCompacted(operation.getMetrics().get("TOTAL_LOG_FILES_SIZE").longValue());
            s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
            s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
            s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
            s.getStat().setRuntimeStats(runtimeStats);
        }).collect(Collectors.toList());
    }

    @Override
    public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable, HoodieWriteConfig config, String compactionCommitTime, Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering) throws IOException {
        JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
        this.totalLogFiles = new LongAccumulator();
        this.totalFileSlices = new LongAccumulator();
        jsc.sc().register(this.totalLogFiles);
        jsc.sc().register(this.totalFileSlices);
        ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, "Can only compact table of type " + (Object)((Object)HoodieTableType.MERGE_ON_READ) + " and not " + hoodieTable.getMetaClient().getTableType().name());
        HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
        LOG.info((Object)("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime));
        List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
        partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
        if (partitionPaths.isEmpty()) {
            return null;
        }
        TableFileSystemView.SliceView fileSystemView = hoodieTable.getSliceView();
        LOG.info((Object)("Compaction looking for files to compact in " + partitionPaths + " partitions"));
        context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact");
        List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView.getLatestFileSlices((String)partitionPath).filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())).map(s -> {
            List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
            this.totalLogFiles.add((Object)logFiles.size());
            this.totalFileSlices.add((Object)1L);
            Option<HoodieBaseFile> dataFile = s.getBaseFile();
            return new CompactionOperation(dataFile, (String)partitionPath, logFiles, config.getCompactionStrategy().captureMetrics(config, (FileSlice)s));
        }).filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(Collectors.toList());
        LOG.info((Object)("Total of " + operations.size() + " compactions are retrieved"));
        LOG.info((Object)("Total number of latest files slices " + this.totalFileSlices.value()));
        LOG.info((Object)("Total number of log files " + this.totalLogFiles.value()));
        LOG.info((Object)("Total number of file slices " + this.totalFileSlices.value()));
        HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(Collectors.toList()));
        ValidationUtils.checkArgument(compactionPlan.getOperations().stream().noneMatch(op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering + ", Selected workload :" + (Object)((Object)compactionPlan));
        if (compactionPlan.getOperations().isEmpty()) {
            LOG.warn((Object)("After filtering, Nothing to compact for " + metaClient.getBasePath()));
        }
        return compactionPlan;
    }
}

