/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.Restorable;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T>,
Restorable<List<State<T>>> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
    private final String commitUser;
    protected final SnapshotManager snapshotManager;
    private final FileStoreScan scan;
    @Nullable
    private final IndexMaintainer.Factory<T> indexFactory;
    @Nullable
    protected IOManager ioManager;
    protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
    private ExecutorService lazyCompactExecutor;
    private boolean closeCompactExecutorWhenLeaving = true;
    private boolean ignorePreviousFiles = false;
    protected boolean isStreamingMode = false;

    protected AbstractFileStoreWrite(String commitUser, SnapshotManager snapshotManager, FileStoreScan scan, @Nullable IndexMaintainer.Factory<T> indexFactory) {
        this.commitUser = commitUser;
        this.snapshotManager = snapshotManager;
        this.scan = scan;
        this.indexFactory = indexFactory;
        this.writers = new HashMap<BinaryRow, Map<Integer, WriterContainer<T>>>();
    }

    @Override
    public FileStoreWrite<T> withIOManager(IOManager ioManager) {
        this.ioManager = ioManager;
        return this;
    }

    @Override
    public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
        return this;
    }

    @Override
    public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
        this.ignorePreviousFiles = ignorePreviousFiles;
    }

    public void withCompactExecutor(ExecutorService compactExecutor) {
        this.lazyCompactExecutor = compactExecutor;
        this.closeCompactExecutorWhenLeaving = false;
    }

    @Override
    public void write(BinaryRow partition, int bucket, T data) throws Exception {
        WriterContainer<T> container = this.getWriterWrapper(partition, bucket);
        container.writer.write(data);
        if (container.indexMaintainer != null) {
            container.indexMaintainer.notifyNewRecord(data);
        }
    }

    @Override
    public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
        this.getWriterWrapper((BinaryRow)partition, (int)bucket).writer.compact(fullCompaction);
    }

    @Override
    public void notifyNewFiles(long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files) {
        WriterContainer<T> writerContainer = this.getWriterWrapper(partition, bucket);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Get extra compact files for partition {}, bucket {}. Extra snapshot {}, base snapshot {}.\nFiles: {}", new Object[]{partition, bucket, snapshotId, writerContainer.baseSnapshotId, files});
        }
        if (snapshotId > writerContainer.baseSnapshotId) {
            writerContainer.writer.addNewFiles(files);
        }
    }

    @Override
    public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier) throws Exception {
        long latestCommittedIdentifier = this.writers.values().stream().map(Map::values).flatMap(Collection::stream).mapToLong(w -> w.lastModifiedCommitIdentifier).max().orElse(Long.MIN_VALUE) == Long.MIN_VALUE ? Long.MIN_VALUE : this.snapshotManager.latestSnapshotOfUser(this.commitUser).map(Snapshot::commitIdentifier).orElse(Long.MIN_VALUE);
        ArrayList<CommitMessage> result = new ArrayList<CommitMessage>();
        Iterator<Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>>> partIter = this.writers.entrySet().iterator();
        while (partIter.hasNext()) {
            Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partEntry = partIter.next();
            BinaryRow partition = partEntry.getKey();
            Iterator<Map.Entry<Integer, WriterContainer<T>>> bucketIter = partEntry.getValue().entrySet().iterator();
            while (bucketIter.hasNext()) {
                Map.Entry<Integer, WriterContainer<T>> entry = bucketIter.next();
                int bucket = entry.getKey();
                WriterContainer<T> writerContainer = entry.getValue();
                CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
                ArrayList<IndexFileMeta> newIndexFiles = new ArrayList();
                if (writerContainer.indexMaintainer != null) {
                    newIndexFiles = writerContainer.indexMaintainer.prepareCommit();
                }
                CommitMessageImpl committable = new CommitMessageImpl(partition, bucket, increment.newFilesIncrement(), increment.compactIncrement(), new IndexIncrement(newIndexFiles));
                result.add(committable);
                if (committable.isEmpty()) {
                    if (writerContainer.lastModifiedCommitIdentifier > latestCommittedIdentifier) continue;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Closing writer for partition {}, bucket {}. Writer's last modified identifier is {}, while latest committed identifier is {}, current commit identifier is {}.", new Object[]{partition, bucket, writerContainer.lastModifiedCommitIdentifier, latestCommittedIdentifier, commitIdentifier});
                    }
                    writerContainer.writer.close();
                    bucketIter.remove();
                    continue;
                }
                writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
            }
            if (!partEntry.getValue().isEmpty()) continue;
            partIter.remove();
        }
        return result;
    }

    @Override
    public void close() throws Exception {
        for (Map<Integer, WriterContainer<T>> bucketWriters : this.writers.values()) {
            for (WriterContainer<T> writerContainer : bucketWriters.values()) {
                writerContainer.writer.close();
            }
        }
        this.writers.clear();
        if (this.lazyCompactExecutor != null && this.closeCompactExecutorWhenLeaving) {
            this.lazyCompactExecutor.shutdownNow();
        }
    }

    @Override
    public List<State<T>> checkpoint() {
        ArrayList<State<T>> result = new ArrayList<State<T>>();
        for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitionEntry : this.writers.entrySet()) {
            BinaryRow partition = partitionEntry.getKey();
            for (Map.Entry<Integer, WriterContainer<T>> bucketEntry : partitionEntry.getValue().entrySet()) {
                CommitIncrement increment;
                int bucket = bucketEntry.getKey();
                WriterContainer<T> writerContainer = bucketEntry.getValue();
                try {
                    increment = writerContainer.writer.prepareCommit(false);
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to extract state from writer of partition " + partition + " bucket " + bucket, e);
                }
                Collection<DataFileMeta> dataFiles = writerContainer.writer.dataFiles();
                result.add(new State(partition, bucket, writerContainer.baseSnapshotId, writerContainer.lastModifiedCommitIdentifier, dataFiles, writerContainer.indexMaintainer, increment));
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Extracted state " + result);
        }
        return result;
    }

    @Override
    public void restore(List<State<T>> states) {
        for (State<T> state : states) {
            RecordWriter<T> writer = this.createWriter(state.partition, state.bucket, state.dataFiles, state.commitIncrement, this.compactExecutor());
            this.notifyNewWriter(writer);
            WriterContainer<T> writerContainer = new WriterContainer<T>(writer, state.indexMaintainer, state.baseSnapshotId);
            writerContainer.lastModifiedCommitIdentifier = state.lastModifiedCommitIdentifier;
            this.writers.computeIfAbsent(state.partition, k -> new HashMap()).put(state.bucket, writerContainer);
        }
    }

    private WriterContainer<T> getWriterWrapper(BinaryRow partition, int bucket) {
        Map<Integer, WriterContainer<T>> buckets = this.writers.get(partition);
        if (buckets == null) {
            buckets = new HashMap<Integer, WriterContainer<T>>();
            this.writers.put(partition.copy(), buckets);
        }
        return buckets.computeIfAbsent(bucket, k -> this.createWriterContainer(partition.copy(), bucket, this.ignorePreviousFiles));
    }

    @VisibleForTesting
    public WriterContainer<T> createWriterContainer(BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating writer for partition {}, bucket {}", (Object)partition, (Object)bucket);
        }
        Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
        ArrayList<DataFileMeta> restoreFiles = new ArrayList();
        if (!ignorePreviousFiles && latestSnapshotId != null) {
            restoreFiles = this.scanExistingFileMetas(latestSnapshotId, partition, bucket);
        }
        IndexMaintainer<T> indexMaintainer = this.indexFactory == null ? null : this.indexFactory.createOrRestore(latestSnapshotId, partition, bucket);
        RecordWriter<T> writer = this.createWriter(partition.copy(), bucket, restoreFiles, null, this.compactExecutor());
        this.notifyNewWriter(writer);
        return new WriterContainer<T>(writer, indexMaintainer, latestSnapshotId);
    }

    @Override
    public void isStreamingMode(boolean isStreamingMode) {
        this.isStreamingMode = isStreamingMode;
    }

    private List<DataFileMeta> scanExistingFileMetas(long snapshotId, BinaryRow partition, int bucket) {
        ArrayList<DataFileMeta> existingFileMetas = new ArrayList<DataFileMeta>();
        this.scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files().stream().map(ManifestEntry::file).forEach(existingFileMetas::add);
        return existingFileMetas;
    }

    private ExecutorService compactExecutor() {
        if (this.lazyCompactExecutor == null) {
            this.lazyCompactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "-compaction"));
        }
        return this.lazyCompactExecutor;
    }

    @VisibleForTesting
    public ExecutorService getCompactExecutor() {
        return this.lazyCompactExecutor;
    }

    protected void notifyNewWriter(RecordWriter<T> writer) {
    }

    protected abstract RecordWriter<T> createWriter(BinaryRow var1, int var2, List<DataFileMeta> var3, @Nullable CommitIncrement var4, ExecutorService var5);

    public static class State<T> {
        protected final BinaryRow partition;
        protected final int bucket;
        protected final long baseSnapshotId;
        protected final long lastModifiedCommitIdentifier;
        protected final List<DataFileMeta> dataFiles;
        @Nullable
        protected final IndexMaintainer<T> indexMaintainer;
        protected final CommitIncrement commitIncrement;

        protected State(BinaryRow partition, int bucket, long baseSnapshotId, long lastModifiedCommitIdentifier, Collection<DataFileMeta> dataFiles, @Nullable IndexMaintainer<T> indexMaintainer, CommitIncrement commitIncrement) {
            this.partition = partition;
            this.bucket = bucket;
            this.baseSnapshotId = baseSnapshotId;
            this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
            this.dataFiles = new ArrayList<DataFileMeta>(dataFiles);
            this.indexMaintainer = indexMaintainer;
            this.commitIncrement = commitIncrement;
        }

        public String toString() {
            return String.format("{%s, %d, %d, %d, %s, %s, %s}", this.partition, this.bucket, this.baseSnapshotId, this.lastModifiedCommitIdentifier, this.dataFiles, this.indexMaintainer, this.commitIncrement);
        }
    }

    @VisibleForTesting
    public static class WriterContainer<T> {
        public final RecordWriter<T> writer;
        @Nullable
        public final IndexMaintainer<T> indexMaintainer;
        protected final long baseSnapshotId;
        protected long lastModifiedCommitIdentifier;

        protected WriterContainer(RecordWriter<T> writer, @Nullable IndexMaintainer<T> indexMaintainer, Long baseSnapshotId) {
            this.writer = writer;
            this.indexMaintainer = indexMaintainer;
            this.baseSnapshotId = baseSnapshotId == null ? 0L : baseSnapshotId;
            this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
        }
    }
}

