/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.sink.writer;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class BaseWriter<IN>
implements SinkWriter<IN> {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final long MAX_APPEND_REQUEST_BYTES = (long)((double)StreamWriter.getApiMaxRequestBytes() * 0.95);
    private long appendRequestSizeBytes;
    protected final int subtaskId;
    private final String tablePath;
    private final BigQueryConnectOptions connectOptions;
    private final ProtoSchema protoSchema;
    private final BigQueryProtoSerializer serializer;
    private final ProtoRows.Builder protoRowsBuilder;
    final Queue<AppendInfo> appendResponseFuturesQueue;
    BigQueryServices.StorageWriteClient writeClient;
    StreamWriter streamWriter;
    String streamName;
    long totalRecordsSeen;
    long totalRecordsWritten;
    Counter numberOfRecordsWrittenToBigQuery;
    Counter numberOfRecordsSeenByWriter;
    Counter numberOfRecordsSeenByWriterSinceCheckpoint;

    BaseWriter(int subtaskId, String tablePath, BigQueryConnectOptions connectOptions, BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer) {
        this.subtaskId = subtaskId;
        this.tablePath = tablePath;
        this.connectOptions = connectOptions;
        this.protoSchema = BaseWriter.getProtoSchema(schemaProvider);
        this.serializer = serializer;
        this.serializer.init(schemaProvider);
        this.appendRequestSizeBytes = 0L;
        this.appendResponseFuturesQueue = new LinkedList<AppendInfo>();
        this.protoRowsBuilder = ProtoRows.newBuilder();
    }

    public void flush(boolean endOfInput) {
        if (this.appendRequestSizeBytes > 0L) {
            this.append();
        }
        this.logger.info("Validating all pending append responses in subtask {}", (Object)this.subtaskId);
        this.validateAppendResponses(true);
    }

    public void close() {
        this.logger.debug("Closing writer in subtask {}", (Object)this.subtaskId);
        if (this.protoRowsBuilder != null) {
            this.protoRowsBuilder.clear();
        }
        if (this.appendResponseFuturesQueue != null) {
            this.appendResponseFuturesQueue.clear();
        }
        if (this.streamWriter != null) {
            this.streamWriter.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
        }
    }

    abstract void sendAppendRequest(ProtoRows var1);

    abstract void validateAppendResponse(AppendInfo var1);

    void addToAppendRequest(ByteString protoRow) {
        this.protoRowsBuilder.addSerializedRows(protoRow);
        this.appendRequestSizeBytes += (long)this.getProtoRowBytes(protoRow);
    }

    void append() {
        this.sendAppendRequest(this.protoRowsBuilder.build());
        this.protoRowsBuilder.clear();
        this.appendRequestSizeBytes = 0L;
    }

    void createStreamWriter(boolean enableConnectionPool) {
        try {
            if (this.writeClient == null) {
                this.writeClient = BigQueryServicesFactory.instance((BigQueryConnectOptions)this.connectOptions).storageWrite();
            }
            this.logger.info("Creating BigQuery StreamWriter for write stream {} in subtask {}", (Object)this.streamName, (Object)this.subtaskId);
            this.streamWriter = this.writeClient.createStreamWriter(this.streamName, this.protoSchema, enableConnectionPool);
        }
        catch (IOException e) {
            this.logger.error(String.format("Unable to create StreamWriter for stream %s in subtask %d", this.streamName, this.subtaskId), (Throwable)e);
            throw new BigQueryConnectorException("Unable to connect to BigQuery", e);
        }
    }

    void createWriteStream(WriteStream.Type streamType) {
        try {
            if (this.writeClient == null) {
                this.writeClient = BigQueryServicesFactory.instance((BigQueryConnectOptions)this.connectOptions).storageWrite();
            }
            this.logger.info("Creating BigQuery write stream in subtask {}", (Object)this.subtaskId);
            this.streamName = this.writeClient.createWriteStream(this.tablePath, streamType).getName();
        }
        catch (IOException e) {
            this.logger.error(String.format("Unable to create write stream in subtask %d", this.subtaskId), (Throwable)e);
            throw new BigQueryConnectorException("Unable to connect to BigQuery", e);
        }
    }

    boolean fitsInAppendRequest(ByteString protoRow) {
        return this.appendRequestSizeBytes + (long)this.getProtoRowBytes(protoRow) <= MAX_APPEND_REQUEST_BYTES;
    }

    ByteString getProtoRow(IN element) throws BigQuerySerializationException {
        ByteString protoRow = this.serializer.serialize(element);
        if ((long)this.getProtoRowBytes(protoRow) > MAX_APPEND_REQUEST_BYTES) {
            this.logger.error("A single row of size %d bytes exceeded the allowed maximum of %d bytes for an append request", (Object)this.getProtoRowBytes(protoRow), (Object)MAX_APPEND_REQUEST_BYTES);
            throw new BigQuerySerializationException("Record size exceeds BigQuery append request limit");
        }
        return protoRow;
    }

    private static ProtoSchema getProtoSchema(BigQuerySchemaProvider schemaProvider) {
        return ProtoSchemaConverter.convert((Descriptors.Descriptor)schemaProvider.getDescriptor());
    }

    private int getProtoRowBytes(ByteString protoRow) {
        return protoRow.size() + 2;
    }

    void validateAppendResponses(boolean waitForResponse) {
        while (!this.appendResponseFuturesQueue.isEmpty()) {
            AppendInfo appendInfo = this.appendResponseFuturesQueue.peek();
            if (!waitForResponse && !appendInfo.getFuture().isDone()) break;
            this.appendResponseFuturesQueue.poll();
            this.validateAppendResponse(appendInfo);
        }
    }

    void logAndThrowFatalException(Throwable error) {
        this.logger.error(String.format("AppendRows request failed in subtask %d", this.subtaskId), error);
        throw new BigQueryConnectorException("Error while writing to BigQuery", error);
    }

    void logAndThrowFatalException(String errorMessage) {
        this.logger.error(String.format("AppendRows request failed in subtask %d\n%s", this.subtaskId, errorMessage));
        throw new BigQueryConnectorException(String.format("Error while writing to BigQuery\n%s", errorMessage));
    }

    void initializeMetrics(SinkWriterMetricGroup sinkWriterMetricGroup) {
        this.numberOfRecordsSeenByWriter = sinkWriterMetricGroup.counter("numberOfRecordsSeenByWriter");
        this.numberOfRecordsWrittenToBigQuery = sinkWriterMetricGroup.counter("numberOfRecordsWrittenToBigQuery");
        this.numberOfRecordsSeenByWriterSinceCheckpoint = sinkWriterMetricGroup.counter("numberOfRecordsSeenByWriterSinceCheckpoint");
    }

    @Internal
    long getAppendRequestSizeBytes() {
        return this.appendRequestSizeBytes;
    }

    @Internal
    Queue<ApiFuture> getAppendResponseFuturesQueue() {
        return new LinkedList<AppendInfo>(this.appendResponseFuturesQueue);
    }

    @Internal
    ProtoRows getProtoRows() {
        return this.protoRowsBuilder.build();
    }

    static class AppendInfo {
        private final ApiFuture<AppendRowsResponse> future;
        private final long expectedOffset;
        private final long recordsAppended;

        AppendInfo(ApiFuture<AppendRowsResponse> future, long expectedOffset, long recordsAppended) {
            this.future = future;
            this.expectedOffset = expectedOffset;
            this.recordsAppended = recordsAppended;
        }

        public ApiFuture<AppendRowsResponse> getFuture() {
            return this.future;
        }

        public long getExpectedOffset() {
            return this.expectedOffset;
        }

        public long getRecordsAppended() {
            return this.recordsAppended;
        }
    }
}

