/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.compact;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O>
implements Serializable {
    private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);

    public abstract void preCompact(HoodieTable var1, HoodieTimeline var2, String var3);

    public abstract void maybePersist(HoodieData<WriteStatus> var1, HoodieWriteConfig var2);

    public HoodieData<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, HoodieTable table, HoodieWriteConfig config, String compactionInstantTime, HoodieCompactionHandler compactionHandler) {
        if (compactionPlan == null || compactionPlan.getOperations() == null || compactionPlan.getOperations().isEmpty()) {
            return context.emptyHoodieData();
        }
        HoodieActiveTimeline timeline = table.getActiveTimeline();
        HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
        timeline.transitionCompactionRequestedToInflight(instant);
        table.getMetaClient().reloadActiveTimeline();
        HoodieTableMetaClient metaClient = table.getMetaClient();
        TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
        try {
            if (StringUtils.isNullOrEmpty(config.getInternalSchema())) {
                Schema readerSchema = schemaResolver.getTableAvroSchema(false);
                config.setSchema(readerSchema.toString());
            }
        }
        catch (Exception readerSchema) {
            // empty catch block
        }
        List operations = compactionPlan.getOperations().stream().map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
        LOG.info((Object)("Compactor compacting " + operations + " files"));
        context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices");
        TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
        return context.parallelize(operations).map(operation -> this.compact(compactionHandler, metaClient, config, (CompactionOperation)operation, compactionInstantTime, taskContextSupplier)).flatMap(List::iterator);
    }

    public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler, HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String instantTime, TaskContextSupplier taskContextSupplier) throws IOException {
        Schema readerSchema;
        HoodieWrapperFileSystem fs = metaClient.getFs();
        Option<Object> internalSchemaOption = Option.empty();
        if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
            readerSchema = new Schema.Parser().parse(config.getSchema());
            internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
            ((HoodieTable)((Object)compactionHandler)).getConfig().setDefault(config);
        } else {
            readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
        }
        LOG.info((Object)("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + " for commit " + instantTime));
        String maxInstantTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet("commit", "rollback", "deltacommit")).filterCompletedInstants().lastInstant().get().getTimestamp();
        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
        LOG.info((Object)("MaxMemoryPerCompaction => " + maxMemoryPerCompaction));
        List logFiles = operation.getDeltaFileNames().stream().map(p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withFileSystem(fs).withBasePath(metaClient.getBasePath()).withLogFilePaths(logFiles)).withReaderSchema(readerSchema).withLatestInstantTime(maxInstantTime).withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())).withMaxMemorySizeInBytes(maxMemoryPerCompaction).withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()).withReverseReader(config.getCompactionReverseLogReadEnabled()).withBufferSize(config.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(config.getSpillableMapBasePath()).withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withOperationField(config.allowOperationMetadataField()).withPartition(operation.getPartitionPath()).build();
        Option<HoodieBaseFile> oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
        if (!scanner.iterator().hasNext() && !oldDataFileOpt.isPresent()) {
            scanner.close();
            return new ArrayList<WriteStatus>();
        }
        Iterator<List<WriteStatus>> result = oldDataFileOpt.isPresent() ? compactionHandler.handleUpdate(instantTime, operation.getPartitionPath(), operation.getFileId(), scanner.getRecords(), oldDataFileOpt.get()) : compactionHandler.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(), scanner.getRecords());
        scanner.close();
        Iterable resultIterable = () -> result;
        return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
            s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
            s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
            s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
            s.getStat().setPartitionPath(operation.getPartitionPath());
            s.getStat().setTotalLogSizeCompacted(operation.getMetrics().get("TOTAL_LOG_FILES_SIZE").longValue());
            s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
            s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
            s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
            s.getStat().setRuntimeStats(runtimeStats);
        }).collect(Collectors.toList());
    }

    HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config, String compactionCommitTime, Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering) throws IOException {
        HoodieAccumulator totalLogFiles = context.newAccumulator();
        HoodieAccumulator totalFileSlices = context.newAccumulator();
        ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, "Can only compact table of type " + (Object)((Object)HoodieTableType.MERGE_ON_READ) + " and not " + hoodieTable.getMetaClient().getTableType().name());
        HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
        LOG.info((Object)("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime));
        List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
        partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
        if (partitionPaths.isEmpty()) {
            return null;
        }
        TableFileSystemView.SliceView fileSystemView = hoodieTable.getSliceView();
        LOG.info((Object)("Compaction looking for files to compact in " + partitionPaths + " partitions"));
        context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact");
        List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView.getLatestFileSlices((String)partitionPath).filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())).map(s -> {
            List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
            totalLogFiles.add(logFiles.size());
            totalFileSlices.add(1L);
            Option<HoodieBaseFile> dataFile = s.getBaseFile();
            return new CompactionOperation(dataFile, (String)partitionPath, logFiles, config.getCompactionStrategy().captureMetrics(config, (FileSlice)s));
        }).filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(Collectors.toList());
        LOG.info((Object)("Total of " + operations.size() + " compactions are retrieved"));
        LOG.info((Object)("Total number of latest files slices " + totalFileSlices.value()));
        LOG.info((Object)("Total number of log files " + totalLogFiles.value()));
        LOG.info((Object)("Total number of file slices " + totalFileSlices.value()));
        HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(Collectors.toList()));
        ValidationUtils.checkArgument(compactionPlan.getOperations().stream().noneMatch(op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering + ", Selected workload :" + (Object)((Object)compactionPlan));
        if (compactionPlan.getOperations().isEmpty()) {
            LOG.warn((Object)("After filtering, Nothing to compact for " + metaClient.getBasePath()));
        }
        return compactionPlan;
    }
}

