/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
    protected final SnapshotManager snapshotManager;
    private final FileStoreScan scan;
    private final int writerNumberMax;
    @Nullable
    private final IndexMaintainer.Factory<T> indexFactory;
    @Nullable
    private final DeletionVectorsMaintainer.Factory dvMaintainerFactory;
    private final int totalBuckets;
    private final RowType partitionType;
    @Nullable
    protected IOManager ioManager;
    protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
    private ExecutorService lazyCompactExecutor;
    private boolean closeCompactExecutorWhenLeaving = true;
    private boolean ignorePreviousFiles = false;
    protected boolean isStreamingMode = false;
    protected CompactionMetrics compactionMetrics = null;
    protected final String tableName;
    private boolean isInsertOnly;
    private boolean legacyPartitionName;

    protected AbstractFileStoreWrite(SnapshotManager snapshotManager, FileStoreScan scan, @Nullable IndexMaintainer.Factory<T> indexFactory, @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, String tableName, CoreOptions options, int totalBuckets, RowType partitionType, int writerNumberMax, boolean legacyPartitionName) {
        this.snapshotManager = snapshotManager;
        this.scan = scan;
        if (options.manifestDeleteFileDropStats() && this.scan != null) {
            this.scan.dropStats();
        }
        this.indexFactory = indexFactory;
        this.dvMaintainerFactory = dvMaintainerFactory;
        this.totalBuckets = totalBuckets;
        this.partitionType = partitionType;
        this.writers = new HashMap<BinaryRow, Map<Integer, WriterContainer<T>>>();
        this.tableName = tableName;
        this.writerNumberMax = writerNumberMax;
        this.legacyPartitionName = legacyPartitionName;
    }

    @Override
    public FileStoreWrite<T> withIOManager(IOManager ioManager) {
        this.ioManager = ioManager;
        return this;
    }

    @Override
    public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
        return this;
    }

    @Override
    public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
        this.ignorePreviousFiles = ignorePreviousFiles;
    }

    @Override
    public void withCompactExecutor(ExecutorService compactExecutor) {
        this.lazyCompactExecutor = compactExecutor;
        this.closeCompactExecutorWhenLeaving = false;
    }

    @Override
    public void withInsertOnly(boolean insertOnly) {
        this.isInsertOnly = insertOnly;
        for (Map<Integer, WriterContainer<T>> containerMap : this.writers.values()) {
            for (WriterContainer<T> container : containerMap.values()) {
                container.writer.withInsertOnly(insertOnly);
            }
        }
    }

    @Override
    public void write(BinaryRow partition, int bucket, T data) throws Exception {
        WriterContainer<T> container = this.getWriterWrapper(partition, bucket);
        container.writer.write(data);
        if (container.indexMaintainer != null) {
            container.indexMaintainer.notifyNewRecord(data);
        }
    }

    @Override
    public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
        this.getWriterWrapper((BinaryRow)partition, (int)bucket).writer.compact(fullCompaction);
    }

    @Override
    public void notifyNewFiles(long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files) {
        WriterContainer<T> writerContainer = this.getWriterWrapper(partition, bucket);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Get extra compact files for partition {}, bucket {}. Extra snapshot {}, base snapshot {}.\nFiles: {}", new Object[]{partition, bucket, snapshotId, writerContainer.baseSnapshotId, files});
        }
        if (snapshotId > writerContainer.baseSnapshotId) {
            writerContainer.writer.addNewFiles(files);
        }
    }

    @Override
    public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier) throws Exception {
        Function<WriterContainer, Boolean> writerCleanChecker = this.writers.values().stream().map(Map::values).flatMap(Collection::stream).mapToLong(w -> w.lastModifiedCommitIdentifier).max().orElse(Long.MIN_VALUE) == Long.MIN_VALUE ? writerContainer -> false : this.createWriterCleanChecker();
        ArrayList<CommitMessage> result = new ArrayList<CommitMessage>();
        Iterator<Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>>> partIter = this.writers.entrySet().iterator();
        while (partIter.hasNext()) {
            Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partEntry = partIter.next();
            BinaryRow partition = partEntry.getKey();
            Iterator<Map.Entry<Integer, WriterContainer<T>>> bucketIter = partEntry.getValue().entrySet().iterator();
            while (bucketIter.hasNext()) {
                CompactDeletionFile compactDeletionFile;
                Map.Entry<Integer, WriterContainer<T>> entry = bucketIter.next();
                int bucket = entry.getKey();
                WriterContainer<T> writerContainer2 = entry.getValue();
                CommitIncrement increment = writerContainer2.writer.prepareCommit(waitCompaction);
                ArrayList<IndexFileMeta> newIndexFiles = new ArrayList<IndexFileMeta>();
                if (writerContainer2.indexMaintainer != null) {
                    newIndexFiles.addAll(writerContainer2.indexMaintainer.prepareCommit());
                }
                if ((compactDeletionFile = increment.compactDeletionFile()) != null) {
                    compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
                }
                CommitMessageImpl committable = new CommitMessageImpl(partition, bucket, increment.newFilesIncrement(), increment.compactIncrement(), new IndexIncrement(newIndexFiles));
                result.add(committable);
                if (committable.isEmpty()) {
                    if (!writerCleanChecker.apply(writerContainer2).booleanValue()) continue;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Closing writer for partition {}, bucket {}. Writer's last modified identifier is {}, while current commit identifier is {}.", new Object[]{partition, bucket, writerContainer2.lastModifiedCommitIdentifier, commitIdentifier});
                    }
                    writerContainer2.writer.close();
                    bucketIter.remove();
                    continue;
                }
                writerContainer2.lastModifiedCommitIdentifier = commitIdentifier;
            }
            if (!partEntry.getValue().isEmpty()) continue;
            partIter.remove();
        }
        return result;
    }

    protected abstract Function<WriterContainer<T>, Boolean> createWriterCleanChecker();

    protected static <T> Function<WriterContainer<T>, Boolean> createConflictAwareWriterCleanChecker(String commitUser, SnapshotManager snapshotManager) {
        long latestCommittedIdentifier = snapshotManager.latestSnapshotOfUser(commitUser).map(Snapshot::commitIdentifier).orElse(Long.MIN_VALUE);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Latest committed identifier is {}", (Object)latestCommittedIdentifier);
        }
        return writerContainer -> writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier && !writerContainer.writer.isCompacting();
    }

    protected static <T> Function<WriterContainer<T>, Boolean> createNoConflictAwareWriterCleanChecker() {
        return writerContainer -> true;
    }

    @Override
    public void close() throws Exception {
        for (Map<Integer, WriterContainer<T>> bucketWriters : this.writers.values()) {
            for (WriterContainer<T> writerContainer : bucketWriters.values()) {
                writerContainer.writer.close();
            }
        }
        this.writers.clear();
        if (this.lazyCompactExecutor != null && this.closeCompactExecutorWhenLeaving) {
            this.lazyCompactExecutor.shutdownNow();
        }
        if (this.compactionMetrics != null) {
            this.compactionMetrics.close();
        }
    }

    @Override
    public List<FileStoreWrite.State<T>> checkpoint() {
        ArrayList<FileStoreWrite.State<T>> result = new ArrayList<FileStoreWrite.State<T>>();
        for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitionEntry : this.writers.entrySet()) {
            BinaryRow partition = partitionEntry.getKey();
            for (Map.Entry<Integer, WriterContainer<T>> bucketEntry : partitionEntry.getValue().entrySet()) {
                CommitIncrement increment;
                int bucket = bucketEntry.getKey();
                WriterContainer<T> writerContainer = bucketEntry.getValue();
                try {
                    increment = writerContainer.writer.prepareCommit(false);
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to extract state from writer of partition " + partition + " bucket " + bucket, e);
                }
                Collection<DataFileMeta> dataFiles = writerContainer.writer.dataFiles();
                result.add(new FileStoreWrite.State(partition, bucket, writerContainer.baseSnapshotId, writerContainer.lastModifiedCommitIdentifier, dataFiles, writerContainer.writer.maxSequenceNumber(), writerContainer.indexMaintainer, writerContainer.deletionVectorsMaintainer, increment));
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Extracted state " + result);
        }
        return result;
    }

    @Override
    public void restore(List<FileStoreWrite.State<T>> states) {
        for (FileStoreWrite.State<T> state : states) {
            RecordWriter<T> writer = this.createWriter(state.baseSnapshotId, state.partition, state.bucket, state.dataFiles, state.maxSequenceNumber, state.commitIncrement, this.compactExecutor(), state.deletionVectorsMaintainer);
            this.notifyNewWriter(writer);
            WriterContainer<T> writerContainer = new WriterContainer<T>(writer, state.indexMaintainer, state.deletionVectorsMaintainer, state.baseSnapshotId);
            writerContainer.lastModifiedCommitIdentifier = state.lastModifiedCommitIdentifier;
            this.writers.computeIfAbsent(state.partition, k -> new HashMap()).put(state.bucket, writerContainer);
        }
    }

    public Map<BinaryRow, List<Integer>> getActiveBuckets() {
        HashMap<BinaryRow, List<Integer>> result = new HashMap<BinaryRow, List<Integer>>();
        for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitions : this.writers.entrySet()) {
            result.put(partitions.getKey(), new ArrayList<Integer>(partitions.getValue().keySet()));
        }
        return result;
    }

    protected WriterContainer<T> getWriterWrapper(BinaryRow partition, int bucket) {
        Map<Integer, WriterContainer<T>> buckets = this.writers.get(partition);
        if (buckets == null) {
            buckets = new HashMap<Integer, WriterContainer<T>>();
            this.writers.put(partition.copy(), buckets);
        }
        return buckets.computeIfAbsent(bucket, k -> this.createWriterContainer(partition.copy(), bucket, this.ignorePreviousFiles));
    }

    private long writerNumber() {
        return this.writers.values().stream().mapToLong(e -> e.values().size()).sum();
    }

    @VisibleForTesting
    public WriterContainer<T> createWriterContainer(BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
        IndexMaintainer<T> indexMaintainer;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating writer for partition {}, bucket {}", (Object)partition, (Object)bucket);
        }
        if (!this.isStreamingMode && this.writerNumber() >= (long)this.writerNumberMax) {
            try {
                this.forceBufferSpill();
            }
            catch (Exception e) {
                throw new RuntimeException("Error happens while force buffer spill", e);
            }
        }
        Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
        ArrayList<DataFileMeta> restoreFiles = new ArrayList();
        if (!ignorePreviousFiles && latestSnapshotId != null) {
            restoreFiles = this.scanExistingFileMetas(latestSnapshotId, partition, bucket);
        }
        IndexMaintainer<T> indexMaintainer2 = this.indexFactory == null ? null : (indexMaintainer = this.indexFactory.createOrRestore(ignorePreviousFiles ? null : latestSnapshotId, partition, bucket));
        DeletionVectorsMaintainer deletionVectorsMaintainer = this.dvMaintainerFactory == null ? null : this.dvMaintainerFactory.createOrRestore(ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
        RecordWriter<T> writer = this.createWriter(latestSnapshotId, partition.copy(), bucket, restoreFiles, DataFileMeta.getMaxSequenceNumber(restoreFiles), null, this.compactExecutor(), deletionVectorsMaintainer);
        writer.withInsertOnly(this.isInsertOnly);
        this.notifyNewWriter(writer);
        return new WriterContainer<T>(writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId);
    }

    @Override
    public void withExecutionMode(boolean isStreamingMode) {
        this.isStreamingMode = isStreamingMode;
    }

    @Override
    public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) {
        this.compactionMetrics = new CompactionMetrics(metricRegistry, this.tableName);
        return this;
    }

    private List<DataFileMeta> scanExistingFileMetas(long snapshotId, BinaryRow partition, int bucket) {
        ArrayList<DataFileMeta> existingFileMetas = new ArrayList<DataFileMeta>();
        List<ManifestEntry> files = this.scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files();
        for (ManifestEntry entry : files) {
            if (entry.totalBuckets() != this.totalBuckets) {
                String partInfo = this.partitionType.getFieldCount() > 0 ? "partition " + FileStorePathFactory.getPartitionComputer(this.partitionType, CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(), this.legacyPartitionName).generatePartValues(partition) : "table";
                throw new RuntimeException(String.format("Try to write %s with a new bucket num %d, but the previous bucket num is %d. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", partInfo, this.totalBuckets, entry.totalBuckets()));
            }
            existingFileMetas.add(entry.file());
        }
        return existingFileMetas;
    }

    private ExecutorService compactExecutor() {
        if (this.lazyCompactExecutor == null) {
            this.lazyCompactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "-compaction"));
        }
        return this.lazyCompactExecutor;
    }

    @VisibleForTesting
    public ExecutorService getCompactExecutor() {
        return this.lazyCompactExecutor;
    }

    protected void notifyNewWriter(RecordWriter<T> writer) {
    }

    protected abstract RecordWriter<T> createWriter(@Nullable Long var1, BinaryRow var2, int var3, List<DataFileMeta> var4, long var5, @Nullable CommitIncrement var7, ExecutorService var8, @Nullable DeletionVectorsMaintainer var9);

    protected void forceBufferSpill() throws Exception {
    }

    @VisibleForTesting
    Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() {
        return this.writers;
    }

    @VisibleForTesting
    public static class WriterContainer<T> {
        public final RecordWriter<T> writer;
        @Nullable
        public final IndexMaintainer<T> indexMaintainer;
        @Nullable
        public final DeletionVectorsMaintainer deletionVectorsMaintainer;
        protected final long baseSnapshotId;
        protected long lastModifiedCommitIdentifier;

        protected WriterContainer(RecordWriter<T> writer, @Nullable IndexMaintainer<T> indexMaintainer, @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer, Long baseSnapshotId) {
            this.writer = writer;
            this.indexMaintainer = indexMaintainer;
            this.deletionVectorsMaintainer = deletionVectorsMaintainer;
            this.baseSnapshotId = baseSnapshotId == null ? 0L : baseSnapshotId;
            this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
        }
    }
}

