/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.cluster;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.append.cluster.IncrementalClusterManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RewriteIncrementalClusterCommittableOperator
extends BoundedOneInputOperator<Committable, Committable> {
    protected static final Logger LOG = LoggerFactory.getLogger(RewriteIncrementalClusterCommittableOperator.class);
    private static final long serialVersionUID = 1L;
    private final FileStoreTable table;
    private final Map<BinaryRow, Integer> outputLevels;
    private transient Map<BinaryRow, List<DataFileMeta>> partitionFiles;

    public RewriteIncrementalClusterCommittableOperator(FileStoreTable table, Map<BinaryRow, Integer> outputLevels) {
        this.table = table;
        this.outputLevels = outputLevels;
    }

    public void open() throws Exception {
        this.partitionFiles = new HashMap<BinaryRow, List<DataFileMeta>>();
    }

    public void processElement(StreamRecord<Committable> element) throws Exception {
        CommitMessageImpl message;
        Committable committable = (Committable)element.getValue();
        if (committable.kind() != Committable.Kind.FILE) {
            this.output.collect(element);
        }
        Preconditions.checkArgument((message = (CommitMessageImpl)committable.wrappedCommittable()).bucket() == 0);
        BinaryRow partition = message.partition();
        this.partitionFiles.computeIfAbsent(partition, file -> new ArrayList()).addAll(message.newFilesIncrement().newFiles());
    }

    public void endInput() throws Exception {
        this.emitAll(Long.MAX_VALUE);
    }

    protected void emitAll(long checkpointId) {
        for (Map.Entry<BinaryRow, List<DataFileMeta>> partitionEntry : this.partitionFiles.entrySet()) {
            BinaryRow partition = partitionEntry.getKey();
            List<DataFileMeta> clusterAfter = IncrementalClusterManager.upgrade(partitionEntry.getValue(), this.outputLevels.get(partition));
            LOG.info("Partition {}: upgrade file level to {}", (Object)partition, (Object)this.outputLevels.get(partition));
            CompactIncrement compactIncrement = new CompactIncrement(Collections.emptyList(), clusterAfter, Collections.emptyList());
            CommitMessageImpl clusterMessage = new CommitMessageImpl(partition, 0, this.table.coreOptions().bucket(), DataIncrement.emptyIncrement(), compactIncrement);
            this.output.collect((Object)new StreamRecord((Object)new Committable(checkpointId, Committable.Kind.FILE, clusterMessage)));
        }
        this.partitionFiles.clear();
    }
}

