/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.sink.writer;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.sink.TwoPhaseCommittingStatefulSink;
import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
import com.google.cloud.flink.bigquery.sink.throttle.Throttler;
import com.google.cloud.flink.bigquery.sink.throttle.WriteStreamCreationThrottler;
import com.google.cloud.flink.bigquery.sink.writer.BaseWriter;
import com.google.cloud.flink.bigquery.sink.writer.BigQueryWriterState;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.StringUtils;

public class BigQueryBufferedWriter<IN>
extends BaseWriter<IN>
implements TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable> {
    private final Throttler writeStreamCreationThrottler;
    private String streamNameInState;
    private long streamOffset;
    private long streamOffsetInState;
    private long appendRequestRowCount;
    Counter numberOfRecordsBufferedByBigQuerySinceCheckpoint;
    long totalRecordsCommitted;
    private boolean isFirstWriteAfterCheckpoint;

    public BigQueryBufferedWriter(String tablePath, BigQueryConnectOptions connectOptions, BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, Sink.InitContext context) {
        this("", 0L, tablePath, 0L, 0L, 0L, connectOptions, schemaProvider, serializer, context);
    }

    public BigQueryBufferedWriter(String streamName, long streamOffset, String tablePath, long totalRecordsSeen, long totalRecordsWritten, long totalRecordsCommitted, BigQueryConnectOptions connectOptions, BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, Sink.InitContext context) {
        super(context.getSubtaskId(), tablePath, connectOptions, schemaProvider, serializer);
        this.streamName = this.streamNameInState = StringUtils.isNullOrWhitespaceOnly((String)streamName) ? "" : streamName;
        this.streamOffsetInState = streamOffset;
        this.streamOffset = streamOffset;
        this.totalRecordsSeen = totalRecordsSeen;
        this.totalRecordsWritten = totalRecordsWritten;
        this.totalRecordsCommitted = totalRecordsCommitted;
        this.writeStreamCreationThrottler = new WriteStreamCreationThrottler(this.subtaskId);
        this.appendRequestRowCount = 0L;
        this.isFirstWriteAfterCheckpoint = true;
        this.initializeExactlyOnceMetrics(context);
    }

    public void write(IN element, SinkWriter.Context context) {
        if (this.isFirstWriteAfterCheckpoint) {
            this.preWriteOpsAfterCommit();
        }
        ++this.totalRecordsSeen;
        this.numberOfRecordsSeenByWriter.inc();
        this.numberOfRecordsSeenByWriterSinceCheckpoint.inc();
        try {
            ByteString protoRow = this.getProtoRow(element);
            if (!this.fitsInAppendRequest(protoRow)) {
                this.validateAppendResponses(false);
                this.append();
            }
            this.addToAppendRequest(protoRow);
            ++this.appendRequestRowCount;
        }
        catch (BigQuerySerializationException e) {
            this.logger.error(String.format("Unable to serialize record %s. Dropping it!", element), (Throwable)e);
        }
    }

    private void preWriteOpsAfterCommit() {
        this.isFirstWriteAfterCheckpoint = false;
        long numberOfRecordsWrittenInLastCommit = this.totalRecordsWritten - this.totalRecordsCommitted;
        this.totalRecordsCommitted = this.totalRecordsWritten;
        this.numberOfRecordsWrittenToBigQuery.inc(numberOfRecordsWrittenInLastCommit);
        this.numberOfRecordsBufferedByBigQuerySinceCheckpoint.dec(this.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
        this.numberOfRecordsSeenByWriterSinceCheckpoint.dec(this.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
    }

    @Override
    void sendAppendRequest(ProtoRows protoRows) {
        long rowCount = protoRows.getSerializedRowsCount();
        if (this.streamOffset == this.streamOffsetInState && this.streamName.equals(this.streamNameInState) && !StringUtils.isNullOrWhitespaceOnly((String)this.streamName)) {
            this.performFirstAppendOnRestoredStream(protoRows, rowCount);
            return;
        }
        if (StringUtils.isNullOrWhitespaceOnly((String)this.streamName)) {
            this.logger.info("Throttling creation of BigQuery write stream in subtask {}", (Object)this.subtaskId);
            this.writeStreamCreationThrottler.throttle();
            this.createWriteStream(WriteStream.Type.BUFFERED);
            this.createStreamWriter(false);
        }
        ApiFuture future = this.streamWriter.append(protoRows, this.streamOffset);
        this.postAppendOps((ApiFuture<AppendRowsResponse>)future, rowCount);
    }

    @Override
    void validateAppendResponse(BaseWriter.AppendInfo appendInfo) {
        ApiFuture<AppendRowsResponse> appendResponseFuture = appendInfo.getFuture();
        long expectedOffset = appendInfo.getExpectedOffset();
        long recordsAppended = appendInfo.getRecordsAppended();
        try {
            long offset;
            AppendRowsResponse response = (AppendRowsResponse)appendResponseFuture.get();
            if (response.hasError()) {
                this.logAndThrowFatalException(response.getError().getMessage());
            }
            if ((offset = response.getAppendResult().getOffset().getValue()) != expectedOffset) {
                this.logAndThrowFatalException(String.format("Inconsistent offset in BigQuery API response. Found %d, expected %d", offset, expectedOffset));
            }
            this.totalRecordsWritten += recordsAppended;
            this.numberOfRecordsBufferedByBigQuerySinceCheckpoint.inc(recordsAppended);
        }
        catch (InterruptedException | ExecutionException e) {
            if (e.getCause().getClass() == Exceptions.OffsetAlreadyExists.class) {
                this.logger.info("Ignoring OffsetAlreadyExists error in subtask {} as this can be due to faulty retries", (Object)this.subtaskId);
                return;
            }
            this.logAndThrowFatalException(e);
        }
    }

    public Collection<BigQueryCommittable> prepareCommit() throws IOException, InterruptedException {
        this.logger.info("Preparing commit in subtask {}", (Object)this.subtaskId);
        if (this.streamOffset == 0L || this.streamNameInState.equals(this.streamName) && this.streamOffset == this.streamOffsetInState) {
            this.logger.info("No new data appended in subtask {}. Nothing to commit.", (Object)this.subtaskId);
            return Collections.EMPTY_LIST;
        }
        return Collections.singletonList(new BigQueryCommittable(this.subtaskId, this.streamName, this.streamOffset - 1L));
    }

    public List<BigQueryWriterState> snapshotState(long checkpointId) {
        this.logger.info("Snapshotting state in subtask {} for checkpoint {}", (Object)this.subtaskId, (Object)checkpointId);
        this.isFirstWriteAfterCheckpoint = true;
        this.streamNameInState = this.streamName;
        this.streamOffsetInState = this.streamOffset;
        return Collections.singletonList(new BigQueryWriterState(this.streamName, this.streamOffset, this.totalRecordsSeen, this.totalRecordsWritten, this.totalRecordsCommitted, checkpointId));
    }

    @Override
    public void close() {
        if (!this.streamNameInState.equals(this.streamName) || this.streamOffsetInState != this.streamOffset) {
            this.finalizeStream();
        }
        super.close();
    }

    private void performFirstAppendOnRestoredStream(ProtoRows protoRows, long rowCount) {
        try {
            this.createStreamWriter(false);
        }
        catch (BigQueryConnectorException e) {
            this.discardStreamAndResendAppendRequest(e, protoRows);
            return;
        }
        ApiFuture future = this.streamWriter.append(protoRows, this.streamOffset);
        try {
            AppendRowsResponse response = (AppendRowsResponse)future.get();
            this.postAppendOps((ApiFuture<AppendRowsResponse>)ApiFutures.immediateFuture((Object)response), rowCount);
        }
        catch (InterruptedException | ExecutionException e) {
            if (e.getCause().getClass() == Exceptions.OffsetAlreadyExists.class || e.getCause().getClass() == Exceptions.OffsetOutOfRange.class || e.getCause().getClass() == Exceptions.StreamFinalizedException.class || e.getCause().getClass() == Exceptions.StreamNotFound.class) {
                this.discardStreamAndResendAppendRequest(e, protoRows);
                return;
            }
            this.logAndThrowFatalException(e);
        }
    }

    private void discardStreamAndResendAppendRequest(Exception e, ProtoRows protoRows) {
        this.discardStream(e);
        this.sendAppendRequest(protoRows);
    }

    private void discardStream(Exception e) {
        this.logger.info(String.format("Writer %d cannot use stream %s. Discarding this stream.", this.subtaskId, this.streamName), (Throwable)e);
        this.finalizeStream();
        this.streamName = "";
        this.streamOffset = 0L;
    }

    private void finalizeStream() {
        this.logger.debug("Finalizing write stream {} in subtask {}", (Object)this.streamName, (Object)this.subtaskId);
        try {
            this.writeClient.finalizeWriteStream(this.streamName);
        }
        catch (Exception innerException) {
            this.logger.debug(String.format("Failed while finalizing write stream %s in subtask %d", this.streamName, this.subtaskId), (Throwable)innerException);
        }
    }

    private void postAppendOps(ApiFuture<AppendRowsResponse> future, long rowCount) {
        this.appendResponseFuturesQueue.add(new BaseWriter.AppendInfo(future, this.streamOffset, rowCount));
        this.streamOffset += this.appendRequestRowCount;
        this.appendRequestRowCount = 0L;
    }

    private void initializeExactlyOnceMetrics(Sink.InitContext context) {
        SinkWriterMetricGroup sinkWriterMetricGroup = context.metricGroup();
        this.initializeMetrics(sinkWriterMetricGroup);
        this.numberOfRecordsBufferedByBigQuerySinceCheckpoint = sinkWriterMetricGroup.counter("numberOfRecordsBufferedByBigQuerySinceCheckpoint");
        this.numberOfRecordsSeenByWriter.inc(this.totalRecordsSeen);
        this.numberOfRecordsWrittenToBigQuery.inc(this.totalRecordsCommitted);
    }

    @Internal
    long getStreamOffset() {
        return this.streamOffset;
    }

    @Internal
    long getStreamOffsetInState() {
        return this.streamOffsetInState;
    }

    @Internal
    String getStreamNameInState() {
        return this.streamNameInState;
    }
}

