/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareBucketNewFilesCompactionCoordinator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;

public class UnawareBucketNewFilesCompactionCoordinatorOperator
extends AbstractStreamOperator<Either<Committable, Tuple2<Long, UnawareAppendCompactionTask>>>
implements OneInputStreamOperator<Committable, Either<Committable, Tuple2<Long, UnawareAppendCompactionTask>>>,
BoundedOneInput {
    private final long targetFileSize;
    private final long compactionFileSize;
    private final int totalBuckets;
    private transient UnawareBucketNewFilesCompactionCoordinator coordinator;
    private transient long checkpointId;

    public UnawareBucketNewFilesCompactionCoordinatorOperator(CoreOptions options) {
        this.targetFileSize = options.targetFileSize(false);
        this.compactionFileSize = options.compactionFileSize(false);
        this.totalBuckets = options.bucket();
    }

    public void open() throws Exception {
        super.open();
        this.coordinator = new UnawareBucketNewFilesCompactionCoordinator(this.targetFileSize);
        this.checkpointId = Long.MIN_VALUE;
    }

    public void processElement(StreamRecord<Committable> record) throws Exception {
        Committable committable = (Committable)record.getValue();
        this.checkpointId = Math.max(this.checkpointId, committable.checkpointId());
        if (committable.kind() != Committable.Kind.FILE) {
            this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)committable)));
            return;
        }
        CommitMessageImpl message = (CommitMessageImpl)committable.wrappedCommittable();
        if (message.newFilesIncrement().newFiles().isEmpty()) {
            this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)committable)));
            return;
        }
        BinaryRow partition = message.partition();
        ArrayList<DataFileMeta> skippedFiles = new ArrayList<DataFileMeta>();
        for (DataFileMeta meta : message.newFilesIncrement().newFiles()) {
            if (meta.fileSize() >= this.compactionFileSize) {
                skippedFiles.add(meta);
                continue;
            }
            Optional<Pair<BinaryRow, List<DataFileMeta>>> optionalPair = this.coordinator.addFile(partition, meta);
            if (!optionalPair.isPresent()) continue;
            Pair<BinaryRow, List<DataFileMeta>> p = optionalPair.get();
            Preconditions.checkArgument(!p.getValue().isEmpty());
            if (p.getValue().size() > 1) {
                this.output.collect((Object)new StreamRecord((Object)Either.Right((Object)Tuple2.of((Object)this.checkpointId, (Object)new UnawareAppendCompactionTask(p.getKey(), p.getValue())))));
                continue;
            }
            skippedFiles.add(p.getValue().get(0));
        }
        CommitMessageImpl newMessage = new CommitMessageImpl(message.partition(), message.bucket(), message.totalBuckets(), new DataIncrement(skippedFiles, message.newFilesIncrement().deletedFiles(), message.newFilesIncrement().changelogFiles()), message.compactIncrement(), message.indexIncrement());
        if (!newMessage.isEmpty()) {
            Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
            this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)newCommittable)));
        }
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.emitAll();
    }

    public void endInput() throws Exception {
        this.emitAll();
    }

    private void emitAll() {
        for (Pair<BinaryRow, List<DataFileMeta>> p : this.coordinator.emitAll()) {
            Preconditions.checkArgument(!p.getValue().isEmpty());
            if (p.getValue().size() > 1) {
                this.output.collect((Object)new StreamRecord((Object)Either.Right((Object)Tuple2.of((Object)this.checkpointId, (Object)new UnawareAppendCompactionTask(p.getKey(), p.getValue())))));
                continue;
            }
            CommitMessageImpl message = new CommitMessageImpl(p.getKey(), 0, this.totalBuckets, new DataIncrement(Collections.singletonList(p.getValue().get(0)), Collections.emptyList(), Collections.emptyList()), CompactIncrement.emptyIncrement());
            this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)new Committable(this.checkpointId, Committable.Kind.FILE, message))));
        }
    }
}

