/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact.changelog;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.compact.changelog.ChangelogCompactTask;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.sink.CommitMessageImpl;

public class ChangelogCompactCoordinateOperator
extends AbstractStreamOperator<Either<Committable, ChangelogCompactTask>>
implements OneInputStreamOperator<Committable, Either<Committable, ChangelogCompactTask>>,
BoundedOneInput {
    private final CoreOptions options;
    private transient long checkpointId;
    private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;

    public ChangelogCompactCoordinateOperator(CoreOptions options) {
        this.options = options;
    }

    public void open() throws Exception {
        super.open();
        this.checkpointId = Long.MIN_VALUE;
        this.partitionChangelogs = new LinkedHashMap<BinaryRow, PartitionChangelog>();
    }

    public void processElement(StreamRecord<Committable> record) {
        PartitionChangelog partitionChangelog;
        Committable committable = (Committable)record.getValue();
        this.checkpointId = Math.max(this.checkpointId, committable.checkpointId());
        if (committable.kind() != Committable.Kind.FILE) {
            this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)record.getValue())));
            return;
        }
        CommitMessageImpl message = (CommitMessageImpl)committable.wrappedCommittable();
        if (message.newFilesIncrement().changelogFiles().isEmpty() && message.compactIncrement().changelogFiles().isEmpty()) {
            this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)record.getValue())));
            return;
        }
        long targetFileSize = this.options.targetFileSize(false);
        long compactionFileSize = Math.min(this.options.compactionFileSize(false), this.options.toConfiguration().get(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE).getBytes());
        BinaryRow partition = message.partition();
        Integer bucket = message.bucket();
        ArrayList<DataFileMeta> skippedNewChangelogs = new ArrayList<DataFileMeta>();
        ArrayList<DataFileMeta> skippedCompactChangelogs = new ArrayList<DataFileMeta>();
        for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) {
            if (meta.fileSize() >= compactionFileSize) {
                skippedNewChangelogs.add(meta);
                continue;
            }
            this.partitionChangelogs.computeIfAbsent(partition, k -> new PartitionChangelog()).addNewChangelogFile(bucket, meta);
            partitionChangelog = this.partitionChangelogs.get(partition);
            if (partitionChangelog.totalFileSize < targetFileSize) continue;
            this.emitPartitionChangelogCompactTask(partition);
        }
        for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
            if (meta.fileSize() >= compactionFileSize) {
                skippedCompactChangelogs.add(meta);
                continue;
            }
            this.partitionChangelogs.computeIfAbsent(partition, k -> new PartitionChangelog()).addCompactChangelogFile(bucket, meta);
            partitionChangelog = this.partitionChangelogs.get(partition);
            if (partitionChangelog.totalFileSize < targetFileSize) continue;
            this.emitPartitionChangelogCompactTask(partition);
        }
        CommitMessageImpl newMessage = new CommitMessageImpl(message.partition(), message.bucket(), message.totalBuckets(), new DataIncrement(message.newFilesIncrement().newFiles(), message.newFilesIncrement().deletedFiles(), skippedNewChangelogs), new CompactIncrement(message.compactIncrement().compactBefore(), message.compactIncrement().compactAfter(), skippedCompactChangelogs), message.indexIncrement());
        Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
        this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)newCommittable)));
    }

    public void prepareSnapshotPreBarrier(long checkpointId) {
        this.emitAllPartitionsChangelogCompactTask();
    }

    public void endInput() {
        this.emitAllPartitionsChangelogCompactTask();
    }

    private void emitPartitionChangelogCompactTask(BinaryRow partition) {
        int numCompactChangelogFiles;
        PartitionChangelog partitionChangelog = this.partitionChangelogs.get(partition);
        int numNewChangelogFiles = partitionChangelog.newFileChangelogFiles.values().stream().mapToInt(List::size).sum();
        if (numNewChangelogFiles + (numCompactChangelogFiles = partitionChangelog.compactChangelogFiles.values().stream().mapToInt(List::size).sum()) == 1) {
            CommitMessageImpl message;
            Map.Entry entry;
            if (numNewChangelogFiles == 1) {
                entry = partitionChangelog.newFileChangelogFiles.entrySet().iterator().next();
                message = new CommitMessageImpl(partition, (Integer)entry.getKey(), this.options.bucket(), new DataIncrement(Collections.emptyList(), Collections.emptyList(), (List)entry.getValue()), CompactIncrement.emptyIncrement());
            } else {
                entry = partitionChangelog.compactChangelogFiles.entrySet().iterator().next();
                message = new CommitMessageImpl(partition, (Integer)entry.getKey(), this.options.bucket(), DataIncrement.emptyIncrement(), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), (List)entry.getValue()));
            }
            Committable newCommittable = new Committable(this.checkpointId, Committable.Kind.FILE, message);
            this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)newCommittable)));
        } else {
            this.output.collect((Object)new StreamRecord((Object)Either.Right((Object)new ChangelogCompactTask(this.checkpointId, partition, this.options.bucket(), partitionChangelog.newFileChangelogFiles, partitionChangelog.compactChangelogFiles))));
        }
        this.partitionChangelogs.remove(partition);
    }

    private void emitAllPartitionsChangelogCompactTask() {
        ArrayList<BinaryRow> partitions = new ArrayList<BinaryRow>(this.partitionChangelogs.keySet());
        for (BinaryRow partition : partitions) {
            this.emitPartitionChangelogCompactTask(partition);
        }
    }

    private static class PartitionChangelog {
        private long totalFileSize = 0L;
        private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles = new HashMap<Integer, List<DataFileMeta>>();
        private final Map<Integer, List<DataFileMeta>> compactChangelogFiles = new HashMap<Integer, List<DataFileMeta>>();

        public void addNewChangelogFile(Integer bucket, DataFileMeta file) {
            this.totalFileSize += file.fileSize();
            this.newFileChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList()).add(file);
        }

        public void addCompactChangelogFile(Integer bucket, DataFileMeta file) {
            this.totalFileSize += file.fileSize();
            this.compactChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList()).add(file);
        }
    }
}

