/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.postpone;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;

public class RewritePostponeBucketCommittableOperator
extends BoundedOneInputOperator<Committable, Committable> {
    private static final long serialVersionUID = 1L;
    private final FileStoreTable table;
    private transient FileStorePathFactory pathFactory;
    private transient Map<BinaryRow, Map<Integer, BucketFiles>> bucketFiles;

    public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
        this.table = table;
    }

    public void open() throws Exception {
        this.pathFactory = this.table.store().pathFactory();
        this.bucketFiles = new HashMap<BinaryRow, Map<Integer, BucketFiles>>();
    }

    public void processElement(StreamRecord<Committable> element) throws Exception {
        Committable committable = (Committable)element.getValue();
        if (committable.kind() != Committable.Kind.FILE) {
            this.output.collect(element);
        }
        CommitMessageImpl message = (CommitMessageImpl)committable.wrappedCommittable();
        this.bucketFiles.computeIfAbsent(message.partition(), p -> new HashMap()).computeIfAbsent(message.bucket(), b -> new BucketFiles(this.pathFactory.createDataFilePathFactory(message.partition(), message.bucket()), this.table.fileIO())).update(message);
    }

    public void endInput() throws Exception {
        this.emitAll(Long.MAX_VALUE);
    }

    protected void emitAll(long checkpointId) {
        for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry : this.bucketFiles.entrySet()) {
            for (Map.Entry<Integer, BucketFiles> bucketEntry : partitionEntry.getValue().entrySet()) {
                BucketFiles bucketFiles = bucketEntry.getValue();
                CommitMessageImpl message = new CommitMessageImpl(partitionEntry.getKey(), bucketEntry.getKey(), bucketFiles.totalBuckets, DataIncrement.emptyIncrement(), bucketFiles.makeIncrement());
                this.output.collect((Object)new StreamRecord((Object)new Committable(checkpointId, Committable.Kind.FILE, message)));
            }
        }
        this.bucketFiles.clear();
    }

    private static class BucketFiles {
        private final DataFilePathFactory pathFactory;
        private final FileIO fileIO;
        @Nullable
        private Integer totalBuckets;
        private final Map<String, DataFileMeta> newFiles;
        private final List<DataFileMeta> compactBefore;
        private final List<DataFileMeta> compactAfter;
        private final List<DataFileMeta> changelogFiles;

        private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
            this.pathFactory = pathFactory;
            this.fileIO = fileIO;
            this.newFiles = new LinkedHashMap<String, DataFileMeta>();
            this.compactBefore = new ArrayList<DataFileMeta>();
            this.compactAfter = new ArrayList<DataFileMeta>();
            this.changelogFiles = new ArrayList<DataFileMeta>();
        }

        private void update(CommitMessageImpl message) {
            this.totalBuckets = message.totalBuckets();
            for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
                this.newFiles.put(file.fileName(), file);
            }
            HashMap<String, Path> toDelete = new HashMap<String, Path>();
            for (DataFileMeta file : message.compactIncrement().compactBefore()) {
                if (this.newFiles.containsKey(file.fileName())) {
                    toDelete.put(file.fileName(), this.pathFactory.toPath(file));
                    this.newFiles.remove(file.fileName());
                    continue;
                }
                this.compactBefore.add(file);
            }
            for (DataFileMeta file : message.compactIncrement().compactAfter()) {
                this.compactAfter.add(file);
                toDelete.remove(file.fileName());
            }
            this.changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
            this.changelogFiles.addAll(message.compactIncrement().changelogFiles());
            toDelete.forEach((fileName, path) -> this.fileIO.deleteQuietly((Path)path));
        }

        private CompactIncrement makeIncrement() {
            ArrayList<DataFileMeta> realCompactAfter = new ArrayList<DataFileMeta>(this.newFiles.values());
            realCompactAfter.addAll(this.compactAfter);
            return new CompactIncrement(this.compactBefore, realCompactAfter, this.changelogFiles);
        }
    }
}

