/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.mergetree.ContainsLevels;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeWriter;
import org.apache.paimon.mergetree.compact.CompactStrategy;
import org.apache.paimon.mergetree.compact.FirstRowMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
import org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.MemoryFileStoreWrite;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeyValueFileStoreWrite
extends MemoryFileStoreWrite<KeyValue> {
    private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreWrite.class);
    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
    private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
    private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
    private final Supplier<RecordEqualiser> valueEqualiserSupplier;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final CoreOptions options;
    private final FileIO fileIO;
    private final RowType keyType;
    private final RowType valueType;

    public KeyValueFileStoreWrite(FileIO fileIO, SchemaManager schemaManager, long schemaId, String commitUser, RowType keyType, RowType valueType, Supplier<Comparator<InternalRow>> keyComparatorSupplier, Supplier<RecordEqualiser> valueEqualiserSupplier, MergeFunctionFactory<KeyValue> mfFactory, FileStorePathFactory pathFactory, Map<String, FileStorePathFactory> format2PathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @Nullable IndexMaintainer.Factory<KeyValue> indexFactory, CoreOptions options, KeyValueFieldsExtractor extractor, String tableName) {
        super(commitUser, snapshotManager, scan, options, indexFactory, tableName, pathFactory);
        this.fileIO = fileIO;
        this.keyType = keyType;
        this.valueType = valueType;
        this.readerFactoryBuilder = KeyValueFileReaderFactory.builder(fileIO, schemaManager, schemaId, keyType, valueType, FileFormatDiscover.of(options), pathFactory, extractor, options);
        this.writerFactoryBuilder = KeyValueFileWriterFactory.builder(fileIO, schemaId, keyType, valueType, options.fileFormat(), format2PathFactory, options.targetFileSize());
        this.keyComparatorSupplier = keyComparatorSupplier;
        this.valueEqualiserSupplier = valueEqualiserSupplier;
        this.mfFactory = mfFactory;
        this.options = options;
    }

    protected MergeTreeWriter createWriter(BinaryRow partition, int bucket, List<DataFileMeta> restoreFiles, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating merge tree writer for partition {} bucket {} from restored files {}", new Object[]{partition, bucket, restoreFiles});
        }
        KeyValueFileWriterFactory writerFactory = this.writerFactoryBuilder.build(partition, bucket, this.options);
        Comparator<InternalRow> keyComparator = this.keyComparatorSupplier.get();
        Levels levels = new Levels(keyComparator, restoreFiles, this.options.numLevels());
        UniversalCompaction universalCompaction = new UniversalCompaction(this.options.maxSizeAmplificationPercent(), this.options.sortedRunSizeRatio(), this.options.numSortedRunCompactionTrigger(), this.options.optimizedCompactionInterval());
        CompactStrategy compactStrategy = this.options.changelogProducer() == CoreOptions.ChangelogProducer.LOOKUP ? new ForceUpLevel0Compaction(universalCompaction) : universalCompaction;
        CompactManager compactManager = this.createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels);
        return new MergeTreeWriter(this.bufferSpillable(), this.options.localSortMaxNumFileHandles(), this.ioManager, compactManager, DataFileMeta.getMaxSequenceNumber(restoreFiles), keyComparator, this.mfFactory.create(), writerFactory, this.options.commitForceCompact(), this.options.changelogProducer(), restoreIncrement, this.getWriterMetrics(partition, bucket));
    }

    @VisibleForTesting
    public boolean bufferSpillable() {
        return this.options.writeBufferSpillable(this.fileIO.isObjectStore(), this.isStreamingMode);
    }

    private CompactManager createCompactManager(BinaryRow partition, int bucket, CompactStrategy compactStrategy, ExecutorService compactExecutor, Levels levels) {
        if (this.options.writeOnly()) {
            return new NoopCompactManager();
        }
        Comparator<InternalRow> keyComparator = this.keyComparatorSupplier.get();
        MergeTreeCompactRewriter rewriter = this.createRewriter(partition, bucket, keyComparator, levels);
        return new MergeTreeCompactManager(compactExecutor, levels, compactStrategy, keyComparator, this.options.compactionFileSize(), this.options.numSortedRunStopTrigger(), rewriter, this.getCompactionMetrics(partition, bucket));
    }

    private MergeTreeCompactRewriter createRewriter(BinaryRow partition, int bucket, Comparator<InternalRow> keyComparator, Levels levels) {
        KeyValueFileReaderFactory readerFactory = this.readerFactoryBuilder.build(partition, bucket);
        KeyValueFileWriterFactory writerFactory = this.writerFactoryBuilder.build(partition, bucket, this.options);
        MergeSorter mergeSorter = new MergeSorter(this.options, this.keyType, this.valueType, this.ioManager);
        int maxLevel = this.options.numLevels() - 1;
        CoreOptions.MergeEngine mergeEngine = this.options.mergeEngine();
        switch (this.options.changelogProducer()) {
            case FULL_COMPACTION: {
                return new FullChangelogMergeTreeCompactRewriter(maxLevel, mergeEngine, readerFactory, writerFactory, keyComparator, this.mfFactory, mergeSorter, this.valueEqualiserSupplier.get(), this.options.changelogRowDeduplicate());
            }
            case LOOKUP: {
                if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
                    KeyValueFileReaderFactory keyOnlyReader = this.readerFactoryBuilder.copyWithoutProjection().withValueProjection(new int[0][]).build(partition, bucket);
                    ContainsLevels containsLevels = this.createContainsLevels(levels, keyOnlyReader);
                    return new FirstRowMergeTreeCompactRewriter(maxLevel, mergeEngine, containsLevels, readerFactory, writerFactory, keyComparator, this.mfFactory, mergeSorter, this.valueEqualiserSupplier.get(), this.options.changelogRowDeduplicate());
                }
                LookupLevels lookupLevels = this.createLookupLevels(levels, readerFactory);
                return new LookupMergeTreeCompactRewriter(maxLevel, mergeEngine, lookupLevels, readerFactory, writerFactory, keyComparator, this.mfFactory, mergeSorter, this.valueEqualiserSupplier.get(), this.options.changelogRowDeduplicate());
            }
        }
        return new MergeTreeCompactRewriter(readerFactory, writerFactory, keyComparator, this.mfFactory, mergeSorter);
    }

    private LookupLevels createLookupLevels(Levels levels, KeyValueFileReaderFactory readerFactory) {
        if (this.ioManager == null) {
            throw new RuntimeException("Can not use lookup, there is no temp disk directory to use.");
        }
        Options options = this.options.toConfiguration();
        return new LookupLevels(levels, this.keyComparatorSupplier.get(), this.keyType, this.valueType, file -> readerFactory.createRecordReader(file.schemaId(), file.fileName(), file.fileSize(), file.level()), () -> this.ioManager.createChannel().getPathFile(), new HashLookupStoreFactory(this.cacheManager, this.options.cachePageSize(), options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR).floatValue()), options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE), LookupStoreFactory.bfGenerator(options));
    }

    private ContainsLevels createContainsLevels(Levels levels, KeyValueFileReaderFactory readerFactory) {
        if (this.ioManager == null) {
            throw new RuntimeException("Can not use lookup, there is no temp disk directory to use.");
        }
        Options options = this.options.toConfiguration();
        return new ContainsLevels(levels, this.keyComparatorSupplier.get(), this.keyType, file -> readerFactory.createRecordReader(file.schemaId(), file.fileName(), file.fileSize(), file.level()), () -> this.ioManager.createChannel().getPathFile(), new HashLookupStoreFactory(this.cacheManager, this.options.cachePageSize(), options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR).floatValue()), options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE), LookupStoreFactory.bfGenerator(options));
    }
}

