/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.memory.FlinkMemorySegmentPool;
import org.apache.paimon.flink.memory.MemorySegmentAllocator;
import org.apache.paimon.flink.utils.ManagedMemoryUtils;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.Options;

public abstract class PrepareCommitOperator<IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    @Nullable
    protected transient MemorySegmentPool memoryPool;
    @Nullable
    private transient MemorySegmentAllocator memoryAllocator;
    protected final Options options;
    private boolean endOfInput = false;

    public PrepareCommitOperator(StreamOperatorParameters<OUT> parameters, Options options) {
        this.options = options;
        this.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        if (this.options.get(FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY).booleanValue()) {
            MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager();
            this.memoryAllocator = new MemorySegmentAllocator(containingTask, memoryManager);
            this.memoryPool = new FlinkMemorySegmentPool(ManagedMemoryUtils.computeManagedMemory(this), memoryManager.getPageSize(), this.memoryAllocator);
        }
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        if (!this.endOfInput) {
            this.emitCommittables(false, checkpointId);
        }
    }

    public void endInput() throws Exception {
        this.endOfInput = true;
        this.emitCommittables(true, Long.MAX_VALUE);
    }

    public void close() throws Exception {
        super.close();
        if (this.memoryAllocator != null) {
            this.memoryAllocator.release();
        }
    }

    private void emitCommittables(boolean waitCompaction, long checkpointId) throws IOException {
        this.prepareCommit(waitCompaction, checkpointId).forEach(committable -> this.output.collect((Object)new StreamRecord(committable)));
    }

    protected abstract List<OUT> prepareCommit(boolean var1, long var2) throws IOException;

    protected static abstract class Factory<IN, OUT>
    extends AbstractStreamOperatorFactory<OUT>
    implements OneInputStreamOperatorFactory<IN, OUT> {
        protected final Options options;

        protected Factory(Options options) {
            this.options = options;
            this.chainingStrategy = ChainingStrategy.ALWAYS;
        }
    }
}

