/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.utils.Preconditions;

public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOperator<CommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final Deque<CommitT> inputs = new ArrayDeque<CommitT>();
    private final boolean streamingCheckpointEnabled;
    private final String initialCommitUser;
    protected final NavigableMap<Long, GlobalCommitT> committablesPerCheckpoint;
    private final Committer.Factory<CommitT, GlobalCommitT> committerFactory;
    private final CommittableStateManager<GlobalCommitT> committableStateManager;
    protected Committer<CommitT, GlobalCommitT> committer;
    private transient long currentWatermark;
    private transient boolean endInput;
    private transient String commitUser;

    public CommitterOperator(boolean streamingCheckpointEnabled, String initialCommitUser, Committer.Factory<CommitT, GlobalCommitT> committerFactory, CommittableStateManager<GlobalCommitT> committableStateManager) {
        this.streamingCheckpointEnabled = streamingCheckpointEnabled;
        this.initialCommitUser = initialCommitUser;
        this.committablesPerCheckpoint = new TreeMap<Long, GlobalCommitT>();
        this.committerFactory = Preconditions.checkNotNull(committerFactory);
        this.committableStateManager = committableStateManager;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.currentWatermark = Long.MIN_VALUE;
        this.endInput = false;
        this.commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
        this.committer = this.committerFactory.create(this.commitUser, this.getMetricGroup());
        this.committableStateManager.initializeState(context, this.committer);
    }

    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        if (mark.getTimestamp() != Long.MAX_VALUE) {
            this.currentWatermark = mark.getTimestamp();
        }
    }

    private GlobalCommitT toCommittables(long checkpoint, List<CommitT> inputs) throws Exception {
        return this.committer.combine(checkpoint, this.currentWatermark, inputs);
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.pollInputs();
        this.committableStateManager.snapshotState(context, this.committables(this.committablesPerCheckpoint));
    }

    private List<GlobalCommitT> committables(NavigableMap<Long, GlobalCommitT> map) {
        return new ArrayList(map.values());
    }

    public void endInput() throws Exception {
        this.endInput = true;
        if (this.streamingCheckpointEnabled) {
            return;
        }
        this.pollInputs();
        this.commitUpToCheckpoint(Long.MAX_VALUE);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.commitUpToCheckpoint(this.endInput ? Long.MAX_VALUE : checkpointId);
    }

    private void commitUpToCheckpoint(long checkpointId) throws Exception {
        NavigableMap<Long, GlobalCommitT> headMap = this.committablesPerCheckpoint.headMap(checkpointId, true);
        List<GlobalCommitT> committables = this.committables(headMap);
        this.committer.commit(committables);
        headMap.clear();
        if (committables.isEmpty() && this.committer.forceCreatingSnapshot()) {
            GlobalCommitT commit = this.toCommittables(checkpointId, Collections.emptyList());
            this.committer.commit(Collections.singletonList(commit));
        }
    }

    public void processElement(StreamRecord<CommitT> element) {
        this.output.collect(element);
        this.inputs.add(element.getValue());
    }

    public void close() throws Exception {
        this.committablesPerCheckpoint.clear();
        this.inputs.clear();
        if (this.committer != null) {
            this.committer.close();
        }
        super.close();
    }

    public String getCommitUser() {
        return this.commitUser;
    }

    private void pollInputs() throws Exception {
        Map<Long, List<CommitT>> grouped = this.committer.groupByCheckpoint(this.inputs);
        for (Map.Entry<Long, List<CommitT>> entry : grouped.entrySet()) {
            Long cp = entry.getKey();
            List<CommitT> committables = entry.getValue();
            if (cp != null && cp == Long.MAX_VALUE && this.committablesPerCheckpoint.containsKey(cp)) {
                GlobalCommitT commitT = this.committer.combine(cp, this.currentWatermark, this.committablesPerCheckpoint.get(cp), committables);
                this.committablesPerCheckpoint.put(cp, commitT);
                continue;
            }
            if (this.committablesPerCheckpoint.containsKey(cp)) {
                throw new RuntimeException(String.format("Repeatedly commit the same checkpoint files. \nThe previous files is %s, \nand the subsequent files is %s", this.committablesPerCheckpoint.get(cp), committables));
            }
            this.committablesPerCheckpoint.put(cp, this.toCommittables(cp, committables));
        }
        this.inputs.clear();
    }
}

