/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.compact;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.EngineBroadcastManager;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.action.compact.CompactionExecutionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieCompactor.class);

    public abstract void preCompact(HoodieTable var1, HoodieTimeline var2, WriteOperationType var3, String var4);

    public abstract void maybePersist(HoodieData<WriteStatus> var1, HoodieEngineContext var2, HoodieWriteConfig var3, String var4);

    public Option<EngineBroadcastManager> getEngineBroadcastManager(HoodieEngineContext context, HoodieTableMetaClient metaClient) {
        return Option.empty();
    }

    public HoodieData<WriteStatus> compact(HoodieEngineContext context, WriteOperationType operationType, HoodieCompactionPlan compactionPlan, HoodieTable table, HoodieWriteConfig config, String compactionInstantTime, HoodieCompactionHandler compactionHandler) {
        boolean useFileGroupReaderBasedCompaction;
        if (compactionPlan == null || compactionPlan.getOperations() == null || compactionPlan.getOperations().isEmpty()) {
            return context.emptyHoodieData();
        }
        CompactionExecutionHelper executionHelper = this.getCompactionExecutionStrategy(compactionPlan);
        executionHelper.transitionRequestedToInflight(table, compactionInstantTime);
        table.getMetaClient().reloadActiveTimeline();
        HoodieTableMetaClient metaClient = table.getMetaClient();
        TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
        try {
            if (StringUtils.isNullOrEmpty(config.getInternalSchema())) {
                Schema readerSchema = schemaResolver.getTableAvroSchema(false);
                config.setSchema(readerSchema.toString());
            }
        }
        catch (Exception readerSchema) {
            // empty catch block
        }
        List<CompactionOperation> operations = compactionPlan.getOperations().stream().map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
        LOG.info("Compactor compacting " + operations + " files");
        String maxInstantTime = this.getMaxInstantTime(metaClient);
        context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices: " + config.getTableName());
        TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
        Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient);
        boolean bl = useFileGroupReaderBasedCompaction = context.supportsFileGroupReader() && metaClient.isMetadataTable() == false && config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED) && operationType == WriteOperationType.COMPACT && !this.hasBootstrapFile(operations) && config.populateMetaFields();
        if (useFileGroupReaderBasedCompaction) {
            Option<EngineBroadcastManager> broadcastManagerOpt = this.getEngineBroadcastManager(context, metaClient);
            broadcastManagerOpt.ifPresent(EngineBroadcastManager::prepareAndBroadcast);
            return context.parallelize(operations).map(operation -> this.compact(compactionHandler, metaClient, config, (CompactionOperation)operation, compactionInstantTime, broadcastManagerOpt)).flatMap(List::iterator);
        }
        return context.parallelize(operations).map(operation -> this.compact(compactionHandler, metaClient, config, (CompactionOperation)operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper)).flatMap(List::iterator);
    }

    public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler, HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String instantTime, String maxInstantTime, TaskContextSupplier taskContextSupplier) throws IOException {
        return this.compact(compactionHandler, metaClient, config, operation, instantTime, maxInstantTime, Option.empty(), taskContextSupplier, new CompactionExecutionHelper());
    }

    public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler, HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String instantTime, String maxInstantTime, Option<InstantRange> instantRange, TaskContextSupplier taskContextSupplier, CompactionExecutionHelper executionHelper) throws IOException {
        Schema readerSchema;
        HoodieStorage storage = metaClient.getStorage();
        Option<Object> internalSchemaOption = Option.empty();
        if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
            readerSchema = new Schema.Parser().parse(config.getSchema());
            internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
            ((HoodieTable)((Object)compactionHandler)).getConfig().setDefault(config);
        } else {
            readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
        }
        LOG.info("Compaction operation started for base file: " + operation.getDataFileName() + " and delta files: " + operation.getDeltaFileNames() + " for commit " + instantTime);
        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
        LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);
        List logFiles = operation.getDeltaFileNames().stream().map(p -> new StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), operation.getPartitionPath()), (String)p).toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = ((HoodieMergedLogRecordScanner.Builder)((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(metaClient.getBasePath()).withLogFilePaths(logFiles)).withReaderSchema(readerSchema).withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime, maxInstantTime)).withInstantRange((Option)instantRange)).withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())).withMaxMemorySizeInBytes(maxMemoryPerCompaction).withReverseReader(config.getCompactionReverseLogReadEnabled()).withBufferSize(config.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(config.getSpillableMapBasePath()).withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withOperationField(config.allowOperationMetadataField()).withPartition(operation.getPartitionPath()).withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config)).withRecordMerger(config.getRecordMerger()).withTableMetaClient(metaClient).build();
        Option<HoodieBaseFile> oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath().toString(), operation.getPartitionPath());
        if (!scanner.iterator().hasNext() && !oldDataFileOpt.isPresent()) {
            scanner.close();
            return new ArrayList<WriteStatus>();
        }
        Iterator<List<WriteStatus>> result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, instantTime, scanner, oldDataFileOpt);
        scanner.close();
        Iterable resultIterable = () -> result;
        return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
            HoodieWriteStat stat = s.getStat();
            stat.setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
            stat.setTotalLogFilesCompacted(scanner.getTotalLogFiles());
            stat.setTotalLogRecords(scanner.getTotalLogRecords());
            stat.setPartitionPath(operation.getPartitionPath());
            stat.setTotalLogSizeCompacted(operation.getMetrics().get("TOTAL_LOG_FILES_SIZE").longValue());
            stat.setTotalLogBlocks(scanner.getTotalLogBlocks());
            stat.setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
            stat.setTotalRollbackBlocks(scanner.getTotalRollbacks());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
            if (stat.getRuntimeStats() != null) {
                runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime());
                runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime());
            }
            stat.setRuntimeStats(runtimeStats);
        }).collect(Collectors.toList());
    }

    public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler, HoodieTableMetaClient metaClient, HoodieWriteConfig writeConfig, CompactionOperation operation, String instantTime, Option<EngineBroadcastManager> broadcastManagerOpt) throws IOException {
        return compactionHandler.compactUsingFileGroupReader(instantTime, operation, writeConfig, broadcastManagerOpt.get().retrieveFileGroupReaderContext(metaClient.getBasePath()).get(), broadcastManagerOpt.get().retrieveStorageConfig().get());
    }

    public String getMaxInstantTime(HoodieTableMetaClient metaClient) {
        String maxInstantTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet("commit", "rollback", "deltacommit")).filterCompletedInstants().lastInstant().get().requestedTime();
        return maxInstantTime;
    }

    public CompactionExecutionHelper getCompactionExecutionStrategy(HoodieCompactionPlan compactionPlan) {
        if (compactionPlan.getStrategy() == null || StringUtils.isNullOrEmpty(compactionPlan.getStrategy().getCompactorClassName())) {
            return new CompactionExecutionHelper();
        }
        CompactionExecutionHelper executionStrategy = (CompactionExecutionHelper)ReflectionUtils.loadClass(compactionPlan.getStrategy().getCompactorClassName());
        return executionStrategy;
    }

    private boolean hasBootstrapFile(List<CompactionOperation> operationList) {
        return operationList.stream().anyMatch(operation -> operation.getBootstrapFilePath().isPresent());
    }
}

