/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.postpone;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;

public class PostponeBucketCommittableRewriter {
    private final FileStoreTable table;
    private final FileStorePathFactory pathFactory;
    private final Map<BinaryRow, Map<Integer, BucketFiles>> buckets;

    public PostponeBucketCommittableRewriter(FileStoreTable table) {
        this.table = table;
        this.pathFactory = table.store().pathFactory();
        this.buckets = new HashMap<BinaryRow, Map<Integer, BucketFiles>>();
    }

    public void add(CommitMessageImpl message) {
        this.buckets.computeIfAbsent(message.partition(), p -> new HashMap()).computeIfAbsent(message.bucket(), b -> new BucketFiles(this.pathFactory.createDataFilePathFactory(message.partition(), message.bucket()), this.table.fileIO())).update(message);
    }

    public List<Committable> emitAll(long checkpointId) {
        ArrayList<Committable> result = new ArrayList<Committable>();
        for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry : this.buckets.entrySet()) {
            for (Map.Entry<Integer, BucketFiles> bucketEntry : partitionEntry.getValue().entrySet()) {
                BucketFiles bucketFiles = bucketEntry.getValue();
                Committable committable = new Committable(checkpointId, Committable.Kind.FILE, bucketFiles.makeMessage(partitionEntry.getKey(), bucketEntry.getKey()));
                result.add(committable);
            }
        }
        this.buckets.clear();
        return result;
    }

    private static class BucketFiles {
        private final DataFilePathFactory pathFactory;
        private final FileIO fileIO;
        @Nullable
        private Integer totalBuckets;
        private final Map<String, DataFileMeta> newFiles;
        private final List<DataFileMeta> compactBefore;
        private final List<DataFileMeta> compactAfter;
        private final List<DataFileMeta> changelogFiles;
        private final List<IndexFileMeta> newIndexFiles;
        private final List<IndexFileMeta> deletedIndexFiles;

        private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
            this.pathFactory = pathFactory;
            this.fileIO = fileIO;
            this.newFiles = new LinkedHashMap<String, DataFileMeta>();
            this.compactBefore = new ArrayList<DataFileMeta>();
            this.compactAfter = new ArrayList<DataFileMeta>();
            this.changelogFiles = new ArrayList<DataFileMeta>();
            this.newIndexFiles = new ArrayList<IndexFileMeta>();
            this.deletedIndexFiles = new ArrayList<IndexFileMeta>();
        }

        private void update(CommitMessageImpl message) {
            this.totalBuckets = message.totalBuckets();
            for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
                this.newFiles.put(file.fileName(), file);
            }
            HashMap<String, Path> toDelete = new HashMap<String, Path>();
            for (DataFileMeta file : message.compactIncrement().compactBefore()) {
                if (this.newFiles.containsKey(file.fileName())) {
                    toDelete.put(file.fileName(), this.pathFactory.toPath(file));
                    this.newFiles.remove(file.fileName());
                    continue;
                }
                this.compactBefore.add(file);
            }
            for (DataFileMeta file : message.compactIncrement().compactAfter()) {
                this.compactAfter.add(file);
                toDelete.remove(file.fileName());
            }
            this.changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
            this.changelogFiles.addAll(message.compactIncrement().changelogFiles());
            this.newIndexFiles.addAll(message.compactIncrement().newIndexFiles());
            this.deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles());
            toDelete.forEach((fileName, path) -> this.fileIO.deleteQuietly((Path)path));
        }

        private CommitMessageImpl makeMessage(BinaryRow partition, int bucket) {
            ArrayList<DataFileMeta> realCompactAfter = new ArrayList<DataFileMeta>(this.newFiles.values());
            realCompactAfter.addAll(this.compactAfter);
            return new CommitMessageImpl(partition, bucket, this.totalBuckets, DataIncrement.emptyIncrement(), new CompactIncrement(this.compactBefore, realCompactAfter, this.changelogFiles, this.newIndexFiles, this.deletedIndexFiles));
        }
    }
}

