/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.utils.Preconditions;

public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOperator<CommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private static final long END_INPUT_CHECKPOINT_ID = Long.MAX_VALUE;
    private final Deque<CommitT> inputs = new ArrayDeque<CommitT>();
    private final boolean streamingCheckpointEnabled;
    private final boolean forceSingleParallelism;
    private final String initialCommitUser;
    protected final NavigableMap<Long, GlobalCommitT> committablesPerCheckpoint;
    private final Committer.Factory<CommitT, GlobalCommitT> committerFactory;
    private final CommittableStateManager<GlobalCommitT> committableStateManager;
    protected Committer<CommitT, GlobalCommitT> committer;
    private transient long currentWatermark;
    private transient boolean endInput;
    private transient String commitUser;
    private final Long endInputWatermark;

    public CommitterOperator(boolean streamingCheckpointEnabled, boolean forceSingleParallelism, boolean chaining, String initialCommitUser, Committer.Factory<CommitT, GlobalCommitT> committerFactory, CommittableStateManager<GlobalCommitT> committableStateManager) {
        this(streamingCheckpointEnabled, forceSingleParallelism, chaining, initialCommitUser, committerFactory, committableStateManager, null);
    }

    public CommitterOperator(boolean streamingCheckpointEnabled, boolean forceSingleParallelism, boolean chaining, String initialCommitUser, Committer.Factory<CommitT, GlobalCommitT> committerFactory, CommittableStateManager<GlobalCommitT> committableStateManager, Long endInputWatermark) {
        this.streamingCheckpointEnabled = streamingCheckpointEnabled;
        this.forceSingleParallelism = forceSingleParallelism;
        this.initialCommitUser = initialCommitUser;
        this.committablesPerCheckpoint = new TreeMap<Long, GlobalCommitT>();
        this.committerFactory = Preconditions.checkNotNull(committerFactory);
        this.committableStateManager = committableStateManager;
        this.endInputWatermark = endInputWatermark;
        this.setChainingStrategy(chaining ? ChainingStrategy.ALWAYS : ChainingStrategy.HEAD);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        Preconditions.checkArgument(!this.forceSingleParallelism || this.getRuntimeContext().getNumberOfParallelSubtasks() == 1, "Committer Operator parallelism in paimon MUST be one.");
        this.currentWatermark = Long.MIN_VALUE;
        this.endInput = false;
        this.commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
        this.committer = this.committerFactory.create(Committer.createContext(this.commitUser, this.getMetricGroup(), this.streamingCheckpointEnabled, context.isRestored(), context.getOperatorStateStore()));
        this.committableStateManager.initializeState(context, this.committer);
    }

    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        if (mark.getTimestamp() != Long.MAX_VALUE) {
            this.currentWatermark = mark.getTimestamp();
        }
    }

    private GlobalCommitT toCommittables(long checkpoint, List<CommitT> inputs) throws Exception {
        return this.committer.combine(checkpoint, this.currentWatermark, inputs);
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.pollInputs();
        this.committableStateManager.snapshotState(context, this.committables(this.committablesPerCheckpoint));
    }

    private List<GlobalCommitT> committables(NavigableMap<Long, GlobalCommitT> map) {
        return new ArrayList(map.values());
    }

    public void endInput() throws Exception {
        this.endInput = true;
        if (this.endInputWatermark != null) {
            this.currentWatermark = this.endInputWatermark;
        }
        if (this.streamingCheckpointEnabled) {
            return;
        }
        this.pollInputs();
        this.commitUpToCheckpoint(Long.MAX_VALUE);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.commitUpToCheckpoint(this.endInput ? Long.MAX_VALUE : checkpointId);
    }

    private void commitUpToCheckpoint(long checkpointId) throws Exception {
        NavigableMap<Long, GlobalCommitT> headMap = this.committablesPerCheckpoint.headMap(checkpointId, true);
        List<GlobalCommitT> committables = this.committables(headMap);
        if (committables.isEmpty() && this.committer.forceCreatingSnapshot()) {
            committables = Collections.singletonList(this.toCommittables(checkpointId, Collections.emptyList()));
        }
        if (checkpointId == Long.MAX_VALUE) {
            this.committer.filterAndCommit(committables, false);
        } else {
            this.committer.commit(committables);
        }
        headMap.clear();
    }

    public void processElement(StreamRecord<CommitT> element) {
        this.output.collect(element);
        this.inputs.add(element.getValue());
    }

    public void close() throws Exception {
        this.committablesPerCheckpoint.clear();
        this.inputs.clear();
        if (this.committer != null) {
            this.committer.close();
        }
        super.close();
    }

    public String getCommitUser() {
        return this.commitUser;
    }

    private void pollInputs() throws Exception {
        Map<Long, List<CommitT>> grouped = this.committer.groupByCheckpoint(this.inputs);
        for (Map.Entry<Long, List<CommitT>> entry : grouped.entrySet()) {
            Long cp = entry.getKey();
            List<CommitT> committables = entry.getValue();
            if (cp != null && cp == Long.MAX_VALUE && this.committablesPerCheckpoint.containsKey(cp)) {
                GlobalCommitT commitT = this.committer.combine(cp, this.currentWatermark, this.committablesPerCheckpoint.get(cp), committables);
                this.committablesPerCheckpoint.put(cp, commitT);
                continue;
            }
            if (this.committablesPerCheckpoint.containsKey(cp)) {
                throw new RuntimeException(String.format("Repeatedly commit the same checkpoint files. \nThe previous files is %s, \nand the subsequent files is %s", this.committablesPerCheckpoint.get(cp), committables));
            }
            this.committablesPerCheckpoint.put(cp, this.toCommittables(cp, committables));
        }
        this.inputs.clear();
    }
}

