/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.sink.writer;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
import com.google.cloud.flink.bigquery.sink.writer.BaseWriter;
import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;

public class BigQueryDefaultWriter<IN>
extends BaseWriter<IN> {
    Counter numberOfRecordsWrittenToBigQuerySinceCheckpoint;

    public BigQueryDefaultWriter(String tablePath, BigQueryConnectOptions connectOptions, BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, Sink.InitContext context) {
        super(context.getSubtaskId(), tablePath, connectOptions, schemaProvider, serializer);
        this.streamName = String.format("%s/streams/_default", tablePath);
        this.totalRecordsSeen = 0L;
        this.totalRecordsWritten = 0L;
        this.initializeAtleastOnceFlinkMetrics(context);
    }

    public void write(IN element, SinkWriter.Context context) {
        ++this.totalRecordsSeen;
        this.numberOfRecordsSeenByWriter.inc();
        this.numberOfRecordsSeenByWriterSinceCheckpoint.inc();
        try {
            ByteString protoRow = this.getProtoRow(element);
            if (!this.fitsInAppendRequest(protoRow)) {
                this.validateAppendResponses(false);
                this.append();
            }
            this.addToAppendRequest(protoRow);
        }
        catch (BigQuerySerializationException e) {
            this.logger.error(String.format("Unable to serialize record %s. Dropping it!", element), (Throwable)e);
        }
    }

    @Override
    public void flush(boolean endOfInput) {
        super.flush(endOfInput);
        this.numberOfRecordsSeenByWriterSinceCheckpoint.dec(this.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
        this.numberOfRecordsWrittenToBigQuerySinceCheckpoint.dec(this.numberOfRecordsWrittenToBigQuerySinceCheckpoint.getCount());
    }

    @Override
    void sendAppendRequest(ProtoRows protoRows) {
        if (this.streamWriter == null) {
            this.createStreamWriter(true);
        }
        ApiFuture response = this.streamWriter.append(protoRows);
        this.appendResponseFuturesQueue.add(new BaseWriter.AppendInfo((ApiFuture<AppendRowsResponse>)response, -1L, protoRows.getSerializedRowsCount()));
    }

    @Override
    void validateAppendResponse(BaseWriter.AppendInfo appendInfo) {
        ApiFuture<AppendRowsResponse> appendResponseFuture = appendInfo.getFuture();
        long recordsAppended = appendInfo.getRecordsAppended();
        try {
            AppendRowsResponse response = (AppendRowsResponse)appendResponseFuture.get();
            if (response.hasError()) {
                this.logAndThrowFatalException(response.getError().getMessage());
            }
            this.totalRecordsWritten += recordsAppended;
            this.numberOfRecordsWrittenToBigQuery.inc(recordsAppended);
            this.numberOfRecordsWrittenToBigQuerySinceCheckpoint.inc(recordsAppended);
        }
        catch (InterruptedException | ExecutionException e) {
            this.logAndThrowFatalException(e);
        }
    }

    private void initializeAtleastOnceFlinkMetrics(Sink.InitContext context) {
        SinkWriterMetricGroup sinkWriterMetricGroup = context.metricGroup();
        this.initializeMetrics(sinkWriterMetricGroup);
        this.numberOfRecordsWrittenToBigQuerySinceCheckpoint = sinkWriterMetricGroup.counter("numberOfRecordsWrittenToBigQuerySinceCheckpoint");
    }
}

