/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.MergeTreeWriter;
import org.apache.paimon.mergetree.compact.CompactStrategy;
import org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.LookupCompaction;
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.MemoryFileStoreWrite;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeyValueFileStoreWrite
extends MemoryFileStoreWrite<KeyValue> {
    private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreWrite.class);
    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
    private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
    private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final CoreOptions options;
    private final FileIO fileIO;
    private final RowType keyType;
    private final RowType valueType;

    public KeyValueFileStoreWrite(FileIO fileIO, SchemaManager schemaManager, long schemaId, String commitUser, RowType keyType, RowType valueType, Supplier<Comparator<InternalRow>> keyComparatorSupplier, MergeFunctionFactory<KeyValue> mfFactory, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, KeyValueFieldsExtractor extractor) {
        super(commitUser, snapshotManager, scan, options);
        this.fileIO = fileIO;
        this.keyType = keyType;
        this.valueType = valueType;
        this.readerFactoryBuilder = KeyValueFileReaderFactory.builder(fileIO, schemaManager, schemaId, keyType, valueType, FileFormatDiscover.of(options), pathFactory, extractor);
        this.writerFactoryBuilder = KeyValueFileWriterFactory.builder(fileIO, schemaId, keyType, valueType, options.fileFormat(), pathFactory, options.targetFileSize());
        this.keyComparatorSupplier = keyComparatorSupplier;
        this.mfFactory = mfFactory;
        this.options = options;
    }

    protected MergeTreeWriter createWriter(BinaryRow partition, int bucket, List<DataFileMeta> restoreFiles, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating merge tree writer for partition {} bucket {} from restored files {}", new Object[]{partition, bucket, restoreFiles});
        }
        KeyValueFileWriterFactory writerFactory = this.writerFactoryBuilder.build(partition, bucket, this.options.fileCompressionPerLevel(), this.options.fileCompression());
        Comparator<InternalRow> keyComparator = this.keyComparatorSupplier.get();
        Levels levels = new Levels(keyComparator, restoreFiles, this.options.numLevels());
        UniversalCompaction universalCompaction = new UniversalCompaction(this.options.maxSizeAmplificationPercent(), this.options.sortedRunSizeRatio(), this.options.numSortedRunCompactionTrigger(), this.options.maxSortedRunNum());
        CompactStrategy compactStrategy = this.options.changelogProducer() == CoreOptions.ChangelogProducer.LOOKUP ? new LookupCompaction(universalCompaction) : universalCompaction;
        CompactManager compactManager = this.createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels);
        return new MergeTreeWriter(this.bufferSpillable(), this.options.localSortMaxNumFileHandles(), this.ioManager, compactManager, DataFileMeta.getMaxSequenceNumber(restoreFiles), keyComparator, this.mfFactory.create(), writerFactory, this.options.commitForceCompact(), this.options.changelogProducer(), restoreIncrement);
    }

    private boolean bufferSpillable() {
        return this.options.writeBufferSpillable(this.fileIO.isObjectStore());
    }

    private CompactManager createCompactManager(BinaryRow partition, int bucket, CompactStrategy compactStrategy, ExecutorService compactExecutor, Levels levels) {
        if (this.options.writeOnly()) {
            return new NoopCompactManager();
        }
        Comparator<InternalRow> keyComparator = this.keyComparatorSupplier.get();
        MergeTreeCompactRewriter rewriter = this.createRewriter(partition, bucket, keyComparator, levels);
        return new MergeTreeCompactManager(compactExecutor, levels, compactStrategy, keyComparator, this.options.targetFileSize(), this.options.numSortedRunStopTrigger(), rewriter);
    }

    private MergeTreeCompactRewriter createRewriter(BinaryRow partition, int bucket, Comparator<InternalRow> keyComparator, Levels levels) {
        KeyValueFileReaderFactory readerFactory = this.readerFactoryBuilder.build(partition, bucket);
        KeyValueFileWriterFactory writerFactory = this.writerFactoryBuilder.build(partition, bucket, this.options.fileCompressionPerLevel(), this.options.fileCompression());
        switch (this.options.changelogProducer()) {
            case FULL_COMPACTION: {
                return new FullChangelogMergeTreeCompactRewriter(this.options.numLevels() - 1, readerFactory, writerFactory, keyComparator, this.mfFactory);
            }
            case LOOKUP: {
                LookupLevels lookupLevels = this.createLookupLevels(levels, readerFactory);
                return new LookupMergeTreeCompactRewriter(lookupLevels, readerFactory, writerFactory, keyComparator, this.mfFactory);
            }
        }
        return new MergeTreeCompactRewriter(readerFactory, writerFactory, keyComparator, this.mfFactory);
    }

    private LookupLevels createLookupLevels(Levels levels, KeyValueFileReaderFactory readerFactory) {
        if (this.ioManager == null) {
            throw new RuntimeException("Can not use lookup, there is no temp disk directory to use.");
        }
        return new LookupLevels(levels, this.keyComparatorSupplier.get(), this.keyType, this.valueType, file -> readerFactory.createRecordReader(file.schemaId(), file.fileName(), file.level()), () -> this.ioManager.createChannel().getPathFile(), new HashLookupStoreFactory(this.cacheManager, this.options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR).floatValue()), this.options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), this.options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
    }
}

