/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact.changelog;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.sink.CommitMessageImpl;

public class ChangelogCompactSortOperator
extends AbstractStreamOperator<Committable>
implements OneInputStreamOperator<Committable, Committable>,
BoundedOneInput {
    private transient Map<BinaryRow, Map<Integer, List<DataFileMeta>>> newFileChangelogFiles;
    private transient Map<BinaryRow, Map<Integer, List<DataFileMeta>>> compactChangelogFiles;
    private transient Map<BinaryRow, Integer> numBuckets;

    public void open() {
        this.newFileChangelogFiles = new LinkedHashMap<BinaryRow, Map<Integer, List<DataFileMeta>>>();
        this.compactChangelogFiles = new LinkedHashMap<BinaryRow, Map<Integer, List<DataFileMeta>>>();
        this.numBuckets = new LinkedHashMap<BinaryRow, Integer>();
    }

    public void processElement(StreamRecord<Committable> record) throws Exception {
        Committable committable = (Committable)record.getValue();
        if (committable.kind() != Committable.Kind.FILE) {
            this.output.collect(record);
            return;
        }
        CommitMessageImpl message = (CommitMessageImpl)committable.wrappedCommittable();
        if (message.newFilesIncrement().changelogFiles().isEmpty() && message.compactIncrement().changelogFiles().isEmpty()) {
            this.output.collect(record);
            return;
        }
        this.numBuckets.put(message.partition(), message.totalBuckets());
        BiConsumer<DataFileMeta, Map> addChangelog = (meta, changelogFiles) -> changelogFiles.computeIfAbsent(message.partition(), p -> new TreeMap()).computeIfAbsent(message.bucket(), b -> new ArrayList()).add(meta);
        for (DataFileMeta meta2 : message.newFilesIncrement().changelogFiles()) {
            addChangelog.accept(meta2, this.newFileChangelogFiles);
        }
        for (DataFileMeta meta2 : message.compactIncrement().changelogFiles()) {
            addChangelog.accept(meta2, this.compactChangelogFiles);
        }
        CommitMessageImpl newMessage = new CommitMessageImpl(message.partition(), message.bucket(), message.totalBuckets(), new DataIncrement(message.newFilesIncrement().newFiles(), message.newFilesIncrement().deletedFiles(), Collections.emptyList(), message.newFilesIncrement().newIndexFiles(), message.newFilesIncrement().deletedIndexFiles()), new CompactIncrement(message.compactIncrement().compactBefore(), message.compactIncrement().compactAfter(), Collections.emptyList(), message.compactIncrement().newIndexFiles(), message.compactIncrement().deletedIndexFiles()));
        if (!newMessage.isEmpty()) {
            Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
            this.output.collect((Object)new StreamRecord((Object)newCommittable));
        }
    }

    public void prepareSnapshotPreBarrier(long checkpointId) {
        this.emitAll(checkpointId);
    }

    public void endInput() {
        this.emitAll(Long.MAX_VALUE);
    }

    private void emitAll(long checkpointId) {
        LinkedHashMap<BinaryRow, Set<Integer>> activeBuckets = new LinkedHashMap<BinaryRow, Set<Integer>>();
        this.collectActiveBuckets(this.newFileChangelogFiles, activeBuckets);
        this.collectActiveBuckets(this.compactChangelogFiles, activeBuckets);
        for (Map.Entry entry : activeBuckets.entrySet()) {
            BinaryRow partition = (BinaryRow)entry.getKey();
            Iterator iterator = ((Set)entry.getValue()).iterator();
            while (iterator.hasNext()) {
                int bucket = (Integer)iterator.next();
                CommitMessageImpl newMessage = new CommitMessageImpl(partition, bucket, this.numBuckets.get(partition), new DataIncrement(Collections.emptyList(), Collections.emptyList(), this.sortedChangelogs(this.newFileChangelogFiles, partition, bucket)), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), this.sortedChangelogs(this.compactChangelogFiles, partition, bucket)));
                Committable newCommittable = new Committable(checkpointId, Committable.Kind.FILE, newMessage);
                this.output.collect((Object)new StreamRecord((Object)newCommittable));
            }
        }
        this.newFileChangelogFiles.clear();
        this.compactChangelogFiles.clear();
        this.numBuckets.clear();
    }

    private void collectActiveBuckets(Map<BinaryRow, Map<Integer, List<DataFileMeta>>> from, Map<BinaryRow, Set<Integer>> activeBuckets) {
        for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry : from.entrySet()) {
            activeBuckets.computeIfAbsent(entry.getKey(), k -> new TreeSet()).addAll(entry.getValue().keySet());
        }
    }

    private List<DataFileMeta> sortedChangelogs(Map<BinaryRow, Map<Integer, List<DataFileMeta>>> from, BinaryRow partition, int bucket) {
        ArrayList<DataFileMeta> result = new ArrayList<DataFileMeta>();
        if (from.containsKey(partition) && from.get(partition).containsKey(bucket)) {
            result.addAll((Collection<DataFileMeta>)from.get(partition).get(bucket));
        }
        result.sort(Comparator.comparingLong(DataFileMeta::creationTimeEpochMillis));
        return result;
    }
}

