/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.Restorable;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T>,
Restorable<List<State>> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
    private final String commitUser;
    private final SnapshotManager snapshotManager;
    private final FileStoreScan scan;
    @Nullable
    protected IOManager ioManager;
    protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
    private ExecutorService lazyCompactExecutor;
    private boolean overwrite = false;

    protected AbstractFileStoreWrite(String commitUser, SnapshotManager snapshotManager, FileStoreScan scan) {
        this.commitUser = commitUser;
        this.snapshotManager = snapshotManager;
        this.scan = scan;
        this.writers = new HashMap<BinaryRow, Map<Integer, WriterContainer<T>>>();
    }

    @Override
    public FileStoreWrite<T> withIOManager(IOManager ioManager) {
        this.ioManager = ioManager;
        return this;
    }

    @Override
    public void withOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }

    @Override
    public void write(BinaryRow partition, int bucket, T data) throws Exception {
        RecordWriter<T> writer = this.getWriterWrapper((BinaryRow)partition, (int)bucket).writer;
        writer.write(data);
    }

    @Override
    public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
        this.getWriterWrapper((BinaryRow)partition, (int)bucket).writer.compact(fullCompaction);
    }

    @Override
    public void notifyNewFiles(long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files) {
        WriterContainer<T> writerContainer = this.getWriterWrapper(partition, bucket);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Get extra compact files for partition {}, bucket {}. Extra snapshot {}, base snapshot {}.\nFiles: {}", new Object[]{partition, bucket, snapshotId, writerContainer.baseSnapshotId, files});
        }
        if (snapshotId > writerContainer.baseSnapshotId) {
            writerContainer.writer.addNewFiles(files);
        }
    }

    @Override
    public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier) throws Exception {
        long latestCommittedIdentifier = this.writers.values().stream().map(Map::values).flatMap(Collection::stream).mapToLong(w -> w.lastModifiedCommitIdentifier).max().orElse(Long.MIN_VALUE) == Long.MIN_VALUE ? Long.MIN_VALUE : this.snapshotManager.latestSnapshotOfUser(this.commitUser).map(Snapshot::commitIdentifier).orElse(Long.MIN_VALUE);
        ArrayList<CommitMessage> result = new ArrayList<CommitMessage>();
        Iterator<Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>>> partIter = this.writers.entrySet().iterator();
        while (partIter.hasNext()) {
            Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partEntry = partIter.next();
            BinaryRow partition = partEntry.getKey();
            Iterator<Map.Entry<Integer, WriterContainer<T>>> bucketIter = partEntry.getValue().entrySet().iterator();
            while (bucketIter.hasNext()) {
                Map.Entry<Integer, WriterContainer<T>> entry = bucketIter.next();
                int bucket = entry.getKey();
                WriterContainer<T> writerContainer = entry.getValue();
                CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
                CommitMessageImpl committable = new CommitMessageImpl(partition, bucket, increment.newFilesIncrement(), increment.compactIncrement());
                result.add(committable);
                if (committable.isEmpty()) {
                    if (writerContainer.lastModifiedCommitIdentifier > latestCommittedIdentifier) continue;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Closing writer for partition {}, bucket {}. Writer's last modified identifier is {}, while latest committed identifier is {}", new Object[]{partition, bucket, writerContainer.lastModifiedCommitIdentifier, latestCommittedIdentifier});
                    }
                    writerContainer.writer.close();
                    bucketIter.remove();
                    continue;
                }
                writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
            }
            if (!partEntry.getValue().isEmpty()) continue;
            partIter.remove();
        }
        return result;
    }

    @Override
    public void close() throws Exception {
        for (Map<Integer, WriterContainer<T>> bucketWriters : this.writers.values()) {
            for (WriterContainer<T> writerContainer : bucketWriters.values()) {
                writerContainer.writer.close();
            }
        }
        this.writers.clear();
        if (this.lazyCompactExecutor != null) {
            this.lazyCompactExecutor.shutdownNow();
        }
    }

    @Override
    public List<State> checkpoint() {
        ArrayList<State> result = new ArrayList<State>();
        for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitionEntry : this.writers.entrySet()) {
            BinaryRow partition = partitionEntry.getKey();
            for (Map.Entry<Integer, WriterContainer<T>> bucketEntry : partitionEntry.getValue().entrySet()) {
                CommitIncrement increment;
                int bucket = bucketEntry.getKey();
                WriterContainer<T> writerContainer = bucketEntry.getValue();
                try {
                    increment = writerContainer.writer.prepareCommit(false);
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to extract state from writer of partition " + partition + " bucket " + bucket, e);
                }
                Collection<DataFileMeta> dataFiles = writerContainer.writer.dataFiles();
                result.add(new State(partition, bucket, writerContainer.baseSnapshotId, writerContainer.lastModifiedCommitIdentifier, dataFiles, increment));
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Extracted state " + ((Object)result).toString());
        }
        return result;
    }

    @Override
    public void restore(List<State> states) {
        for (State state : states) {
            RecordWriter<T> writer = this.createWriter(state.partition, state.bucket, state.dataFiles, state.commitIncrement, this.compactExecutor());
            this.notifyNewWriter(writer);
            WriterContainer<T> writerContainer = new WriterContainer<T>(writer, state.baseSnapshotId);
            writerContainer.lastModifiedCommitIdentifier = state.lastModifiedCommitIdentifier;
            this.writers.computeIfAbsent(state.partition, k -> new HashMap()).put(state.bucket, writerContainer);
        }
    }

    private WriterContainer<T> getWriterWrapper(BinaryRow partition, int bucket) {
        Map<Integer, WriterContainer<T>> buckets = this.writers.get(partition);
        if (buckets == null) {
            buckets = new HashMap<Integer, WriterContainer<T>>();
            this.writers.put(partition.copy(), buckets);
        }
        return buckets.computeIfAbsent(bucket, k -> this.createWriterContainer(partition.copy(), bucket, this.overwrite));
    }

    @VisibleForTesting
    public WriterContainer<T> createWriterContainer(BinaryRow partition, int bucket, boolean emptyWriter) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating writer for partition {}, bucket {}", (Object)partition, (Object)bucket);
        }
        Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
        RecordWriter<T> writer = emptyWriter ? this.createWriter(partition.copy(), bucket, Collections.emptyList(), null, this.compactExecutor()) : this.createWriter(partition.copy(), bucket, this.scanExistingFileMetas(latestSnapshotId, partition, bucket), null, this.compactExecutor());
        this.notifyNewWriter(writer);
        return new WriterContainer<T>(writer, latestSnapshotId);
    }

    private List<DataFileMeta> scanExistingFileMetas(Long snapshotId, BinaryRow partition, int bucket) {
        ArrayList<DataFileMeta> existingFileMetas = new ArrayList<DataFileMeta>();
        if (snapshotId != null) {
            this.scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files().stream().map(ManifestEntry::file).forEach(existingFileMetas::add);
        }
        return existingFileMetas;
    }

    private ExecutorService compactExecutor() {
        if (this.lazyCompactExecutor == null) {
            this.lazyCompactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "-compaction"));
        }
        return this.lazyCompactExecutor;
    }

    protected void notifyNewWriter(RecordWriter<T> writer) {
    }

    protected abstract RecordWriter<T> createWriter(BinaryRow var1, int var2, List<DataFileMeta> var3, @Nullable CommitIncrement var4, ExecutorService var5);

    public static class State {
        protected final BinaryRow partition;
        protected final int bucket;
        protected final long baseSnapshotId;
        protected final long lastModifiedCommitIdentifier;
        protected final List<DataFileMeta> dataFiles;
        protected final CommitIncrement commitIncrement;

        protected State(BinaryRow partition, int bucket, long baseSnapshotId, long lastModifiedCommitIdentifier, Collection<DataFileMeta> dataFiles, CommitIncrement commitIncrement) {
            this.partition = partition;
            this.bucket = bucket;
            this.baseSnapshotId = baseSnapshotId;
            this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
            this.dataFiles = new ArrayList<DataFileMeta>(dataFiles);
            this.commitIncrement = commitIncrement;
        }

        public String toString() {
            return String.format("{%s, %d, %d, %d, %s, %s}", this.partition, this.bucket, this.baseSnapshotId, this.lastModifiedCommitIdentifier, this.dataFiles, this.commitIncrement);
        }
    }

    @VisibleForTesting
    public static class WriterContainer<T> {
        public final RecordWriter<T> writer;
        protected final long baseSnapshotId;
        protected long lastModifiedCommitIdentifier;

        protected WriterContainer(RecordWriter<T> writer, Long baseSnapshotId) {
            this.writer = writer;
            this.baseSnapshotId = baseSnapshotId == null ? 0L : baseSnapshotId;
            this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
        }
    }
}

