/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact.changelog;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.compact.changelog.ChangelogCompactTask;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;

public class ChangelogCompactCoordinateOperator
extends AbstractStreamOperator<Either<Committable, ChangelogCompactTask>>
implements OneInputStreamOperator<Committable, Either<Committable, ChangelogCompactTask>>,
BoundedOneInput {
    private final FileStoreTable table;
    private transient long checkpointId;
    private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;

    public ChangelogCompactCoordinateOperator(FileStoreTable table) {
        this.table = table;
    }

    public void open() throws Exception {
        super.open();
        this.checkpointId = Long.MIN_VALUE;
        this.partitionChangelogs = new HashMap<BinaryRow, PartitionChangelog>();
    }

    public void processElement(StreamRecord<Committable> record) {
        PartitionChangelog partitionChangelog;
        Committable committable = (Committable)record.getValue();
        this.checkpointId = Math.max(this.checkpointId, committable.checkpointId());
        if (committable.kind() != Committable.Kind.FILE) {
            this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)record.getValue())));
            return;
        }
        CommitMessageImpl message = (CommitMessageImpl)committable.wrappedCommittable();
        if (message.newFilesIncrement().changelogFiles().isEmpty() && message.compactIncrement().changelogFiles().isEmpty()) {
            this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)record.getValue())));
            return;
        }
        BinaryRow partition = message.partition();
        Integer bucket = message.bucket();
        long targetFileSize = this.table.coreOptions().targetFileSize(false);
        for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) {
            this.partitionChangelogs.computeIfAbsent(partition, k -> new PartitionChangelog()).addNewChangelogFile(bucket, meta);
            partitionChangelog = this.partitionChangelogs.get(partition);
            if (partitionChangelog.totalFileSize < targetFileSize) continue;
            this.emitPartitionChanglogCompactTask(partition);
        }
        for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
            this.partitionChangelogs.computeIfAbsent(partition, k -> new PartitionChangelog()).addCompactChangelogFile(bucket, meta);
            partitionChangelog = this.partitionChangelogs.get(partition);
            if (partitionChangelog.totalFileSize < targetFileSize) continue;
            this.emitPartitionChanglogCompactTask(partition);
        }
        CommitMessageImpl newMessage = new CommitMessageImpl(message.partition(), message.bucket(), new DataIncrement(message.newFilesIncrement().newFiles(), message.newFilesIncrement().deletedFiles(), Collections.emptyList()), new CompactIncrement(message.compactIncrement().compactBefore(), message.compactIncrement().compactAfter(), Collections.emptyList()), message.indexIncrement());
        Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
        this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)newCommittable)));
    }

    public void prepareSnapshotPreBarrier(long checkpointId) {
        this.emitAllPartitionsChanglogCompactTask();
    }

    public void endInput() {
        this.emitAllPartitionsChanglogCompactTask();
    }

    private void emitPartitionChanglogCompactTask(BinaryRow partition) {
        PartitionChangelog partitionChangelog = this.partitionChangelogs.get(partition);
        this.output.collect((Object)new StreamRecord((Object)Either.Right((Object)new ChangelogCompactTask(this.checkpointId, partition, partitionChangelog.newFileChangelogFiles, partitionChangelog.compactChangelogFiles))));
        this.partitionChangelogs.remove(partition);
    }

    private void emitAllPartitionsChanglogCompactTask() {
        ArrayList<BinaryRow> partitions = new ArrayList<BinaryRow>(this.partitionChangelogs.keySet());
        for (BinaryRow partition : partitions) {
            this.emitPartitionChanglogCompactTask(partition);
        }
    }

    private static class PartitionChangelog {
        private long totalFileSize = 0L;
        private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles = new HashMap<Integer, List<DataFileMeta>>();
        private final Map<Integer, List<DataFileMeta>> compactChangelogFiles = new HashMap<Integer, List<DataFileMeta>>();

        public void addNewChangelogFile(Integer bucket, DataFileMeta file) {
            this.totalFileSize += file.fileSize();
            this.newFileChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList()).add(file);
        }

        public void addCompactChangelogFile(Integer bucket, DataFileMeta file) {
            this.totalFileSize += file.fileSize();
            this.compactChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList()).add(file);
        }
    }
}

