/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.postpone;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.avro.AvroSchemaConverter;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.MemoryFileStoreWrite;
import org.apache.paimon.options.Options;
import org.apache.paimon.postpone.PostponeBucketWriter;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostponeBucketFileStoreWrite
extends MemoryFileStoreWrite<KeyValue> {
    private static final Logger LOG = LoggerFactory.getLogger(PostponeBucketFileStoreWrite.class);
    private final CoreOptions options;
    private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
    private final FileIO fileIO;
    private final FileStorePathFactory pathFactory;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
    private boolean forceBufferSpill = false;

    public PostponeBucketFileStoreWrite(FileIO fileIO, FileStorePathFactory pathFactory, TableSchema schema, String commitUser, RowType partitionType, RowType keyType, RowType valueType, MergeFunctionFactory<KeyValue> mfFactory, BiFunction<CoreOptions, String, FileStorePathFactory> formatPathFactory, KeyValueFileReaderFactory.Builder readerFactoryBuilder, SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, String tableName, @Nullable Integer writeId) {
        super(snapshotManager, scan, options, partitionType, null, null, tableName);
        this.fileIO = fileIO;
        this.pathFactory = pathFactory;
        this.mfFactory = mfFactory;
        this.readerFactoryBuilder = readerFactoryBuilder;
        Options newOptions = new Options(options.toMap());
        try {
            AvroSchemaConverter.convertToSchema(schema.logicalRowType(), new HashMap<String, String>());
            newOptions.set(CoreOptions.FILE_FORMAT, "avro");
            newOptions.set(CoreOptions.METADATA_STATS_MODE, "none");
        }
        catch (Exception exception) {
            // empty catch block
        }
        newOptions.set(CoreOptions.DATA_FILE_PREFIX, String.format("%s-u-%s-s-%d-w-", options.dataFilePrefix(), commitUser, writeId == null ? ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE) : writeId.intValue()));
        this.options = new CoreOptions(newOptions);
        FileFormat fileFormat = FileFormat.fileFormat(this.options);
        this.writerFactoryBuilder = KeyValueFileWriterFactory.builder(fileIO, schema.id(), keyType, valueType, fileFormat, FileStorePathFactory.createFormatPathFactories(this.options, formatPathFactory), this.options.targetFileSize(true));
        this.withIgnorePreviousFiles(true);
    }

    public PostponeBucketFileStoreWrite withIOManager(IOManager ioManager) {
        super.withIOManager(ioManager);
        if (this.mfFactory instanceof LookupMergeFunction.Factory) {
            ((LookupMergeFunction.Factory)this.mfFactory).withIOManager(ioManager);
        }
        return this;
    }

    @Override
    protected void forceBufferSpill() throws Exception {
        if (this.ioManager == null) {
            return;
        }
        if (this.forceBufferSpill) {
            return;
        }
        this.forceBufferSpill = true;
        LOG.info("Force buffer spill for postpone file store write, writer number is: {}", (Object)this.writers.size());
        for (Map bucketWriters : this.writers.values()) {
            for (AbstractFileStoreWrite.WriterContainer writerContainer : bucketWriters.values()) {
                ((PostponeBucketWriter)writerContainer.writer).toBufferedWriter();
            }
        }
    }

    @Override
    public void withIgnorePreviousFiles(boolean ignorePrevious) {
        super.withIgnorePreviousFiles(true);
    }

    protected PostponeBucketWriter createWriter(BinaryRow partition, int bucket, List<DataFileMeta> restoreFiles, long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, @Nullable BucketedDvMaintainer deletionVectorsMaintainer) {
        Preconditions.checkArgument(bucket == -2);
        Preconditions.checkArgument(restoreFiles.isEmpty(), "Postpone bucket writers should not restore previous files. This is unexpected.");
        KeyValueFileWriterFactory writerFactory = this.writerFactoryBuilder.build(partition, bucket, this.options);
        return new PostponeBucketWriter(this.fileIO, this.pathFactory.createDataFilePathFactory(partition, bucket), this.options.spillCompressOptions(), this.options.writeBufferSpillDiskSize(), this.ioManager, this.mfFactory.create(), writerFactory, files -> this.newFileRead(partition, bucket, (List<DataFileMeta>)files), this.forceBufferSpill, this.forceBufferSpill, restoreIncrement);
    }

    private RecordReaderIterator<KeyValue> newFileRead(BinaryRow partition, int bucket, List<DataFileMeta> files) throws IOException {
        KeyValueFileReaderFactory readerFactory = this.readerFactoryBuilder.build(partition, bucket, name -> Optional.empty());
        ArrayList suppliers = new ArrayList();
        for (DataFileMeta file : files) {
            suppliers.add(() -> readerFactory.createRecordReader(file));
        }
        return new RecordReaderIterator<KeyValue>(ConcatRecordReader.create(suppliers));
    }

    @Override
    protected Function<AbstractFileStoreWrite.WriterContainer<KeyValue>, Boolean> createWriterCleanChecker() {
        return PostponeBucketFileStoreWrite.createNoConflictAwareWriterCleanChecker();
    }

    public static int getWriteId(String fileName) {
        try {
            String[] parts = fileName.split("-s-");
            return Integer.parseInt(parts[1].substring(0, parts[1].indexOf(45)));
        }
        catch (Exception e) {
            throw new RuntimeException("Data file name " + fileName + " does not match the pattern. This is unexpected.", e);
        }
    }
}

