/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact.changelog;

import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.compact.changelog.ChangelogCompactTask;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ThreadPoolUtils;

public class ChangelogCompactWorkerOperator
extends AbstractStreamOperator<Committable>
implements OneInputStreamOperator<Either<Committable, ChangelogCompactTask>, Committable> {
    private final FileStoreTable table;
    private transient ExecutorService executor;
    private transient MemorySize bufferSize;

    public ChangelogCompactWorkerOperator(FileStoreTable table) {
        this.table = table;
    }

    public void open() throws Exception {
        Options options = new Options(this.table.options());
        int numThreads = options.getOptional(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_THREAD_NUM).orElse(Runtime.getRuntime().availableProcessors());
        this.executor = ThreadPoolUtils.createCachedThreadPool(numThreads, "changelog-compact-async-read-bytes");
        this.bufferSize = options.get(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE);
        LOG.info("Creating {} threads and a buffer of {} bytes for changelog compaction.", (Object)numThreads, (Object)this.bufferSize.getBytes());
    }

    public void processElement(StreamRecord<Either<Committable, ChangelogCompactTask>> record) throws Exception {
        if (((Either)record.getValue()).isLeft()) {
            this.output.collect((Object)new StreamRecord(((Either)record.getValue()).left()));
        } else {
            ChangelogCompactTask task = (ChangelogCompactTask)((Either)record.getValue()).right();
            List<Committable> committables = task.doCompact(this.table, this.executor, this.bufferSize);
            committables.forEach(committable -> this.output.collect((Object)new StreamRecord(committable)));
        }
    }
}

