/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.types.Either;
import org.apache.paimon.append.AppendCompactCoordinator;
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ThreadUtils;

public class AppendBypassCoordinateOperator<CommitT>
extends AbstractStreamOperator<Either<CommitT, AppendCompactTask>>
implements OneInputStreamOperator<CommitT, Either<CommitT, AppendCompactTask>>,
ProcessingTimeService.ProcessingTimeCallback {
    private static final long MAX_PENDING_TASKS = 5000L;
    private final FileStoreTable table;
    private transient ScheduledExecutorService executorService;
    private transient LinkedBlockingQueue<AppendCompactTask> compactTasks;

    public AppendBypassCoordinateOperator(StreamOperatorParameters<Either<CommitT, AppendCompactTask>> parameters, FileStoreTable table, ProcessingTimeService processingTimeService) {
        this.table = table;
        this.processingTimeService = processingTimeService;
        this.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
    }

    public void open() throws Exception {
        super.open();
        Preconditions.checkArgument(RuntimeContextUtils.getNumberOfParallelSubtasks((RuntimeContext)this.getRuntimeContext()) == 1, "Compaction Coordinator parallelism in paimon MUST be one.");
        long intervalMs = this.table.coreOptions().continuousDiscoveryInterval().toMillis();
        this.compactTasks = new LinkedBlockingQueue();
        AppendCompactCoordinator coordinator = new AppendCompactCoordinator(this.table, true, null);
        this.executorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory("Compaction Coordinator"));
        this.executorService.scheduleWithFixedDelay(() -> this.asyncPlan(coordinator), 0L, intervalMs, TimeUnit.MILLISECONDS);
        this.getProcessingTimeService().scheduleWithFixedDelay((ProcessingTimeService.ProcessingTimeCallback)this, 0L, intervalMs);
    }

    private void asyncPlan(AppendCompactCoordinator coordinator) {
        while ((long)this.compactTasks.size() < 5000L) {
            List<AppendCompactTask> tasks = coordinator.run();
            this.compactTasks.addAll(tasks);
            if (!tasks.isEmpty()) continue;
            break;
        }
    }

    public void onProcessingTime(long time) {
        AppendCompactTask task;
        while ((task = this.compactTasks.poll()) != null) {
            this.output.collect((Object)new StreamRecord((Object)Either.Right((Object)task)));
        }
        return;
    }

    public void processElement(StreamRecord<CommitT> record) throws Exception {
        this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)record.getValue())));
    }

    public void close() throws Exception {
        ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, this.executorService);
        super.close();
    }
}

