/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.sink;

import com.google.cloud.flink.bigquery.sink.BigQueryBaseSink;
import com.google.cloud.flink.bigquery.sink.BigQuerySinkConfig;
import com.google.cloud.flink.bigquery.sink.TwoPhaseCommittingStatefulSink;
import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable;
import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittableSerializer;
import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommitter;
import com.google.cloud.flink.bigquery.sink.writer.BigQueryBufferedWriter;
import com.google.cloud.flink.bigquery.sink.writer.BigQueryWriterState;
import com.google.cloud.flink.bigquery.sink.writer.BigQueryWriterStateSerializer;
import java.util.Collection;
import java.util.Comparator;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;

public class BigQueryExactlyOnceSink<IN>
extends BigQueryBaseSink<IN>
implements TwoPhaseCommittingStatefulSink<IN, BigQueryWriterState, BigQueryCommittable> {
    BigQueryExactlyOnceSink(BigQuerySinkConfig sinkConfig) {
        super(sinkConfig);
    }

    @Override
    public TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable> createWriter(Sink.InitContext context) {
        this.checkParallelism(context.getNumberOfParallelSubtasks());
        return new BigQueryBufferedWriter(this.tablePath, this.connectOptions, this.schemaProvider, this.serializer, context);
    }

    @Override
    public TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable> restoreWriter(Sink.InitContext context, Collection<BigQueryWriterState> recoveredState) {
        if (recoveredState == null || recoveredState.isEmpty()) {
            return this.createWriter(context);
        }
        BigQueryWriterState stateToRestore = recoveredState.stream().max(Comparator.comparingLong(state -> state.getCheckpointId())).get();
        return new BigQueryBufferedWriter(stateToRestore.getStreamName(), stateToRestore.getStreamOffset(), this.tablePath, stateToRestore.getTotalRecordsSeen(), stateToRestore.getTotalRecordsWritten(), stateToRestore.getTotalRecordsCommitted(), this.connectOptions, this.schemaProvider, this.serializer, context);
    }

    public Committer<BigQueryCommittable> createCommitter() {
        return new BigQueryCommitter(this.connectOptions);
    }

    public SimpleVersionedSerializer<BigQueryCommittable> getCommittableSerializer() {
        return new BigQueryCommittableSerializer();
    }

    public SimpleVersionedSerializer<BigQueryWriterState> getWriterStateSerializer() {
        return new BigQueryWriterStateSerializer();
    }
}

