/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.utils.SerializableSupplier;

public class RestoreCommittableStateManager<GlobalCommitT>
implements CommittableStateManager<GlobalCommitT> {
    private static final long serialVersionUID = 1L;
    private final SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer;
    private final boolean partitionMarkDoneRecoverFromState;
    private ListState<GlobalCommitT> streamingCommitterState;

    public RestoreCommittableStateManager(SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer, boolean partitionMarkDoneRecoverFromState) {
        this.committableSerializer = committableSerializer;
        this.partitionMarkDoneRecoverFromState = partitionMarkDoneRecoverFromState;
    }

    @Override
    public void initializeState(StateInitializationContext context, Committer<?, GlobalCommitT> committer) throws Exception {
        this.streamingCommitterState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(new ListStateDescriptor("streaming_committer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE)), new VersionedSerializerWrapper((VersionedSerializer)this.committableSerializer.get()));
        ArrayList restored = new ArrayList();
        ((Iterable)this.streamingCommitterState.get()).forEach(restored::add);
        this.streamingCommitterState.clear();
        this.recover(restored, committer);
    }

    protected int recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer) throws Exception {
        return committer.filterAndCommit(committables, true, this.partitionMarkDoneRecoverFromState);
    }

    @Override
    public void snapshotState(StateSnapshotContext context, List<GlobalCommitT> committables) throws Exception {
        this.streamingCommitterState.update(committables);
    }
}

