/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1;

import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.ApiFuture;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.NanoClock;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.SettableApiFuture;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.batching.FlowController;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.cloud.spark.bigquery.repackaged.com.google.auto.value.AutoValue;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.AutoValue_ConnectionWorker_Load;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.AutoValue_ConnectionWorker_TableSchemaAndTimestamp;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.RequestProfiler;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.RowError;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.StreamConnection;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.TelemetryMetrics;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.util.concurrent.Uninterruptibles;
import com.google.cloud.spark.bigquery.repackaged.com.google.protobuf.Int64Value;
import com.google.cloud.spark.bigquery.repackaged.io.grpc.Status;
import com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException;
import com.google.cloud.spark.bigquery.repackaged.io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.lang.constant.Constable;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

class ConnectionWorker
implements AutoCloseable {
    private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
    private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000L;
    static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(5L);
    private Lock lock;
    private Condition hasMessageInWaitingQueue;
    private Condition inflightReduced;
    private final Duration maxRetryDuration;
    private ExecutorService threadPool = Executors.newFixedThreadPool(1);
    private String streamName;
    private String location = null;
    private ProtoSchema writerSchema;
    private final long maxInflightRequests;
    private final long maxInflightBytes;
    private final FlowController.LimitExceededBehavior limitExceededBehavior;
    private final String traceId;
    private String compressorName = null;
    @GuardedBy(value="lock")
    private long inflightRequests = 0L;
    @GuardedBy(value="lock")
    private long inflightBytes = 0L;
    @GuardedBy(value="lock")
    private long conectionRetryCountWithoutCallback = 0L;
    @GuardedBy(value="lock")
    private long connectionRetryStartTime = 0L;
    @GuardedBy(value="lock")
    private boolean streamConnectionIsConnected = false;
    @GuardedBy(value="lock")
    private boolean inflightCleanuped = false;
    @GuardedBy(value="lock")
    private boolean userClosed = false;
    @GuardedBy(value="lock")
    private Throwable connectionFinalStatus = null;
    @GuardedBy(value="lock")
    private final Deque<AppendRequestAndResponse> waitingRequestQueue;
    @GuardedBy(value="lock")
    private final Deque<AppendRequestAndResponse> inflightRequestQueue;
    private final Set<String> destinationSet = ConcurrentHashMap.newKeySet();
    @GuardedBy(value="lock")
    private TableSchemaAndTimestamp updatedSchema;
    private BigQueryWriteClient client;
    private StreamConnection streamConnection;
    private Thread appendThread;
    private final AtomicLong inflightWaitSec = new AtomicLong(0L);
    private final String writerId = UUID.randomUUID().toString();
    private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null;
    private long testOnlyAppendLoopSleepTime = 0L;
    @GuardedBy(value="lock")
    private int responsesToIgnore = 0;
    private final RetrySettings retrySettings;
    private final RequestProfiler.RequestProfilerHook requestProfilerHook;
    private final TelemetryMetrics telemetryMetrics;
    private final Boolean isMultiplexing;
    private static String projectMatching = "projects/[^/]+/";
    private static Pattern streamPatternProject = Pattern.compile(projectMatching);
    static final Pattern DEFAULT_STREAM_PATTERN = Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$");
    private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/";
    private static Pattern streamPatternTable = Pattern.compile(tableMatching);

    public static Boolean isDefaultStreamName(String streamName) {
        Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
        return matcher.matches();
    }

    public static long getApiMaxRequestBytes() {
        return 20000000L;
    }

    static String extractProjectName(String streamName) {
        Matcher streamMatcher = streamPatternProject.matcher(streamName);
        if (streamMatcher.find()) {
            return streamMatcher.group();
        }
        throw new IllegalStateException(String.format("The passed in stream name does not match standard format %s", streamName));
    }

    static String getRoutingHeader(String streamName, String location) {
        String project = ConnectionWorker.extractProjectName(streamName);
        return project + "locations/" + location;
    }

    private String getTableName() {
        Matcher tableMatcher = streamPatternTable.matcher(this.streamName);
        return tableMatcher.find() ? tableMatcher.group(1) : "";
    }

    public boolean hasActiveConnection() {
        boolean isConnected = false;
        this.lock.lock();
        try {
            if (this.streamConnectionIsConnected) {
                isConnected = true;
            }
        }
        finally {
            this.lock.unlock();
        }
        return isConnected;
    }

    public int getInflightRequestQueueLength() {
        int length = 0;
        this.lock.lock();
        try {
            length = this.inflightRequestQueue.size();
        }
        finally {
            this.lock.unlock();
        }
        return length;
    }

    @VisibleForTesting
    Attributes getTelemetryAttributes() {
        return this.telemetryMetrics.getTelemetryAttributes();
    }

    public ConnectionWorker(String streamName, String location, ProtoSchema writerSchema, long maxInflightRequests, long maxInflightBytes, Duration maxRetryDuration, FlowController.LimitExceededBehavior limitExceededBehavior, String traceId, @Nullable String compressorName, BigQueryWriteSettings clientSettings, RetrySettings retrySettings, boolean enableRequestProfiler, boolean enableOpenTelemetry, boolean isMultiplexing) throws IOException {
        this.lock = new ReentrantLock();
        this.hasMessageInWaitingQueue = this.lock.newCondition();
        this.inflightReduced = this.lock.newCondition();
        this.streamName = streamName;
        if (location != null && !location.isEmpty()) {
            this.location = location;
        }
        Duration duration = this.maxRetryDuration = maxRetryDuration != null ? maxRetryDuration : Duration.ofMinutes(5L);
        if (writerSchema == null) {
            throw new StatusRuntimeException(Status.fromCode(Status.Code.INVALID_ARGUMENT).withDescription("Writer schema must be provided when building this writer."));
        }
        this.maxInflightRequests = maxInflightRequests;
        this.maxInflightBytes = maxInflightBytes;
        this.limitExceededBehavior = limitExceededBehavior;
        this.traceId = traceId;
        this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
        this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
        this.compressorName = compressorName;
        this.retrySettings = retrySettings;
        this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler);
        this.telemetryMetrics = new TelemetryMetrics(this, enableOpenTelemetry, this.getTableName(), this.writerId, traceId);
        this.isMultiplexing = isMultiplexing;
        HashMap<String, String> newHeaders = new HashMap<String, String>();
        newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
        if (this.location == null) {
            newHeaders.put("x-goog-request-params", "write_stream=" + this.streamName);
        } else {
            newHeaders.put("x-goog-request-params", "write_location=" + ConnectionWorker.getRoutingHeader(this.streamName, this.location));
        }
        BigQueryWriteSettings stubSettings = ((BigQueryWriteSettings.Builder)clientSettings.toBuilder().setHeaderProvider(FixedHeaderProvider.create(newHeaders))).build();
        this.client = BigQueryWriteClient.create(clientSettings);
        this.appendThread = new Thread(new Runnable(){

            @Override
            public void run() {
                ConnectionWorker.this.appendLoop();
            }
        });
        this.appendThread.setUncaughtExceptionHandler((t2, e) -> {
            log.warning("Exception thrown from append loop, thus stream writer is shutdown due to exception: " + e.toString());
            this.lock.lock();
            try {
                this.connectionFinalStatus = e;
                while (!this.waitingRequestQueue.isEmpty()) {
                    AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
                    this.inflightRequestQueue.addLast(requestWrapper);
                }
            }
            finally {
                this.lock.unlock();
            }
            this.cleanupConnectionAndRequests(true);
        });
        this.appendThread.start();
    }

    private void resetConnection() {
        log.info("Start connecting stream: " + this.streamName + " id: " + this.writerId);
        this.telemetryMetrics.recordConnectionStart();
        if (this.streamConnection != null) {
            this.streamConnection.close();
            Uninterruptibles.sleepUninterruptibly(ConnectionWorker.calculateSleepTimeMilli(this.conectionRetryCountWithoutCallback), TimeUnit.MILLISECONDS);
        }
        this.streamConnection = new StreamConnection(this.client, new StreamConnection.RequestCallback(){

            @Override
            public void run(AppendRowsResponse response) {
                ConnectionWorker.this.requestCallback(response);
            }
        }, new StreamConnection.DoneCallback(){

            @Override
            public void run(Throwable finalStatus) {
                ConnectionWorker.this.doneCallback(finalStatus);
            }
        }, this.compressorName);
        log.info("Finish connecting stream: " + this.streamName + " id: " + this.writerId);
    }

    @GuardedBy(value="lock")
    private boolean shouldWaitForBackoff(AppendRequestAndResponse requestWrapper) {
        if (this.retrySettings != null && Instant.now().isBefore(requestWrapper.blockMessageSendDeadline)) {
            log.fine(String.format("Waiting for wait queue to unblock at %s for retry # %s", requestWrapper.blockMessageSendDeadline, requestWrapper.retryCount));
            return true;
        }
        return false;
    }

    private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper) {
        this.lock.lock();
        this.requestProfilerHook.startOperation(RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
        try {
            Condition condition = this.lock.newCondition();
            while (this.shouldWaitForBackoff(requestWrapper)) {
                condition.await(100L, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        finally {
            this.requestProfilerHook.endOperation(RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
            this.lock.unlock();
        }
    }

    @GuardedBy(value="lock")
    private void addMessageToFrontOfWaitingQueue(AppendRequestAndResponse requestWrapper) {
        this.addMessageToWaitingQueue(requestWrapper, true);
    }

    @GuardedBy(value="lock")
    private void addMessageToBackOfWaitingQueue(AppendRequestAndResponse requestWrapper) {
        this.addMessageToWaitingQueue(requestWrapper, false);
    }

    @GuardedBy(value="lock")
    private void addMessageToWaitingQueue(AppendRequestAndResponse requestWrapper, boolean addToFront) {
        ++this.inflightRequests;
        this.inflightBytes += requestWrapper.messageSize;
        this.hasMessageInWaitingQueue.signal();
        this.requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
        if (addToFront) {
            this.waitingRequestQueue.addFirst(requestWrapper);
        } else {
            this.waitingRequestQueue.add(requestWrapper);
        }
    }

    ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset, String requestUniqueId) {
        if (this.location != null && !this.location.equals(streamWriter.getLocation())) {
            throw new StatusRuntimeException(Status.fromCode(Status.Code.INVALID_ARGUMENT).withDescription("StreamWriter with location " + streamWriter.getLocation() + " is scheduled to use a connection with location " + this.location));
        }
        if (this.location == null && !streamWriter.getStreamName().equals(this.streamName)) {
            throw new StatusRuntimeException(Status.fromCode(Status.Code.INVALID_ARGUMENT).withDescription("StreamWriter with stream name " + streamWriter.getStreamName() + " is scheduled to use a connection with stream name " + this.streamName));
        }
        Preconditions.checkNotNull(streamWriter);
        AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
        requestBuilder.setProtoRows(AppendRowsRequest.ProtoData.newBuilder().setWriterSchema(streamWriter.getProtoSchema()).setRows(rows).build());
        if (offset >= 0L) {
            requestBuilder.setOffset(Int64Value.of(offset));
        }
        requestBuilder.setWriteStream(streamWriter.getStreamName());
        requestBuilder.putAllMissingValueInterpretations(streamWriter.getMissingValueInterpretationMap());
        if (streamWriter.getDefaultValueInterpretation() != AppendRowsRequest.MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED) {
            requestBuilder.setDefaultMissingValueInterpretation(streamWriter.getDefaultValueInterpretation());
        }
        return this.appendInternal(streamWriter, requestBuilder.build(), requestUniqueId);
    }

    Boolean isUserClosed() {
        this.lock.lock();
        try {
            Boolean bl = this.userClosed;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    String getWriteLocation() {
        return this.location;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ApiFuture<AppendRowsResponse> appendInternal(StreamWriter streamWriter, AppendRowsRequest message, String requestUniqueId) {
        AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter, this.retrySettings, requestUniqueId);
        if (requestWrapper.messageSize > ConnectionWorker.getApiMaxRequestBytes()) {
            requestWrapper.appendResult.setException(new StatusRuntimeException(Status.fromCode(Status.Code.INVALID_ARGUMENT).withDescription("MessageSize is too large. Max allow: " + ConnectionWorker.getApiMaxRequestBytes() + " Actual: " + requestWrapper.messageSize)));
            return requestWrapper.appendResult;
        }
        this.lock.lock();
        try {
            if (this.userClosed) {
                requestWrapper.appendResult.setException(new Exceptions.StreamWriterClosedException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Connection is already closed during append"), this.streamName, this.writerId));
                SettableApiFuture<AppendRowsResponse> settableApiFuture = requestWrapper.appendResult;
                return settableApiFuture;
            }
            if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
                if (this.inflightRequests + 1L >= this.maxInflightRequests) {
                    throw new Exceptions.InflightRequestsLimitExceededException(this.writerId, this.maxInflightRequests);
                }
                if (this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) {
                    throw new Exceptions.InflightBytesLimitExceededException(this.writerId, this.maxInflightBytes);
                }
            }
            if (this.connectionFinalStatus != null) {
                String connectionFinalStatusString = this.connectionFinalStatus.toString().contains("com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.UnavailableException") ? this.connectionFinalStatus.toString() + ". This is a most likely a transient condition and may be corrected by retrying with a backoff." : this.connectionFinalStatus.toString();
                requestWrapper.appendResult.setException(new Exceptions.StreamWriterClosedException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Connection is closed due to " + connectionFinalStatusString), this.streamName, this.writerId));
                SettableApiFuture<AppendRowsResponse> settableApiFuture = requestWrapper.appendResult;
                return settableApiFuture;
            }
            this.requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
            ++this.inflightRequests;
            this.inflightBytes += requestWrapper.messageSize;
            this.waitingRequestQueue.addLast(requestWrapper);
            this.hasMessageInWaitingQueue.signal();
            this.requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
            try {
                this.maybeWaitForInflightQuota();
            }
            catch (StatusRuntimeException ex) {
                --this.inflightRequests;
                this.waitingRequestQueue.pollLast();
                this.inflightBytes -= requestWrapper.messageSize;
                throw ex;
            }
            this.requestProfilerHook.endOperation(RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
            SettableApiFuture<AppendRowsResponse> settableApiFuture = requestWrapper.appendResult;
            return settableApiFuture;
        }
        finally {
            this.lock.unlock();
        }
    }

    @GuardedBy(value="lock")
    private void maybeWaitForInflightQuota() {
        long start_time = System.currentTimeMillis();
        while (this.inflightRequests >= this.maxInflightRequests || this.inflightBytes >= this.maxInflightBytes) {
            try {
                this.inflightReduced.await(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                log.warning("Interrupted while waiting for inflight quota. Stream: " + this.streamName + " Error: " + e.toString());
                throw new StatusRuntimeException(Status.fromCode(Status.Code.CANCELLED).withCause(e).withDescription("Interrupted while waiting for quota."));
            }
            long current_wait_time = System.currentTimeMillis() - start_time;
            if (current_wait_time <= INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI) continue;
            throw new StatusRuntimeException(Status.fromCode(Status.Code.CANCELLED).withDescription(String.format("Interrupted while waiting for quota due to long waiting time %sms", current_wait_time)));
        }
        this.inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000L);
    }

    @VisibleForTesting
    static long calculateSleepTimeMilli(long retryCount) {
        return (long)Math.min(Math.pow(2.0, retryCount) * 50.0, 60000.0);
    }

    @VisibleForTesting
    void setTestOnlyAppendLoopSleepTime(long testOnlyAppendLoopSleepTime) {
        this.testOnlyAppendLoopSleepTime = testOnlyAppendLoopSleepTime;
    }

    @VisibleForTesting
    void setTestOnlyRunTimeExceptionInAppendLoop(RuntimeException testOnlyRunTimeExceptionInAppendLoop) {
        this.testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop;
    }

    public long getInflightWaitSeconds() {
        return this.inflightWaitSec.longValue();
    }

    public String getWriterId() {
        return this.writerId;
    }

    boolean isConnectionInUnrecoverableState() {
        this.lock.lock();
        try {
            boolean bl = this.connectionFinalStatus != null;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void close() {
        log.info("User closing stream: " + this.streamName);
        this.lock.lock();
        try {
            this.userClosed = true;
        }
        finally {
            this.lock.unlock();
        }
        log.info("Waiting for append thread to finish. Stream: " + this.streamName + " id: " + this.writerId);
        try {
            this.appendThread.join();
        }
        catch (InterruptedException e) {
            log.warning("Append handler join is interrupted. Stream: " + this.streamName + " id: " + this.writerId + " Error: " + e.toString());
        }
        this.client.close();
        try {
            this.client.awaitTermination(150L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ignored) {
            log.warning("Client await termination timeout in writer id " + this.writerId);
        }
        try {
            log.fine("Begin shutting down user callback thread pool for stream " + this.streamName + " id: " + this.writerId);
            this.threadPool.shutdown();
            this.threadPool.awaitTermination(3L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            log.warning("Close on thread pool for " + this.streamName + " id: " + this.writerId + " is interrupted with exception: " + e.toString());
            throw new IllegalStateException("Thread pool shutdown is interrupted for stream " + this.streamName);
        }
        log.info("User close finishes for stream " + this.streamName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void appendLoop() {
        LinkedList<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
        boolean streamNeedsConnecting = false;
        boolean firstRequestForTableOrSchemaSwitch = true;
        while (!this.waitingQueueDrained()) {
            this.lock.lock();
            try {
                AppendRequestAndResponse requestWrapper;
                Instant sendInstant;
                this.hasMessageInWaitingQueue.await(100L, TimeUnit.MILLISECONDS);
                if (this.inflightRequestQueue.size() > 0 && (sendInstant = this.inflightRequestQueue.getFirst().requestSendTimeStamp) != null) {
                    this.throwIfWaitCallbackTooLong(sendInstant);
                }
                boolean bl = streamNeedsConnecting = !this.streamConnectionIsConnected && this.connectionFinalStatus == null;
                if (streamNeedsConnecting) {
                    while (!this.inflightRequestQueue.isEmpty()) {
                        requestWrapper = this.inflightRequestQueue.pollLast();
                        this.requestProfilerHook.endOperation(RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
                        requestWrapper.requestSendTimeStamp = null;
                        this.requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
                        this.waitingRequestQueue.addFirst(requestWrapper);
                    }
                    this.responsesToIgnore = 0;
                }
                while (!this.waitingRequestQueue.isEmpty()) {
                    requestWrapper = this.waitingRequestQueue.pollFirst();
                    this.requestProfilerHook.endOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
                    this.waitForBackoffIfNecessary(requestWrapper);
                    this.inflightRequestQueue.add(requestWrapper);
                    localQueue.addLast(requestWrapper);
                }
            }
            catch (InterruptedException e) {
                log.warning("Interrupted while waiting for message. Stream: " + this.streamName + " id: " + this.writerId + " Error: " + e.toString());
            }
            finally {
                this.lock.unlock();
            }
            if (localQueue.isEmpty()) continue;
            if (streamNeedsConnecting) {
                this.lock.lock();
                try {
                    this.streamConnectionIsConnected = true;
                }
                finally {
                    this.lock.unlock();
                }
                if (this.testOnlyRunTimeExceptionInAppendLoop != null) {
                    Uninterruptibles.sleepUninterruptibly(this.testOnlyAppendLoopSleepTime, TimeUnit.MILLISECONDS);
                    throw this.testOnlyRunTimeExceptionInAppendLoop;
                }
                this.resetConnection();
                firstRequestForTableOrSchemaSwitch = true;
            }
            while (!localQueue.isEmpty()) {
                ((AppendRequestAndResponse)localQueue.peekFirst()).setRequestSendQueueTime();
                AppendRequestAndResponse wrapper = (AppendRequestAndResponse)localQueue.pollFirst();
                AppendRowsRequest originalRequest = wrapper.message;
                String requestUniqueId = wrapper.requestUniqueId;
                AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();
                if (this.writerSchema == null) {
                    this.writerSchema = originalRequest.getProtoRows().getWriterSchema();
                }
                if (!originalRequest.getWriteStream().isEmpty() && !this.streamName.isEmpty() && !originalRequest.getWriteStream().equals(this.streamName) || originalRequest.getProtoRows().hasWriterSchema() && !originalRequest.getProtoRows().getWriterSchema().equals(this.writerSchema)) {
                    this.streamName = originalRequest.getWriteStream();
                    this.telemetryMetrics.refreshOpenTelemetryTableNameAttributes(this.getTableName());
                    this.writerSchema = originalRequest.getProtoRows().getWriterSchema();
                    firstRequestForTableOrSchemaSwitch = true;
                }
                if (firstRequestForTableOrSchemaSwitch) {
                    this.destinationSet.add(this.streamName);
                    if (this.traceId != null) {
                        originalRequestBuilder.setTraceId(this.traceId);
                    }
                } else if (!this.isMultiplexing.booleanValue()) {
                    originalRequestBuilder.clearWriteStream();
                }
                if (!firstRequestForTableOrSchemaSwitch) {
                    originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
                }
                firstRequestForTableOrSchemaSwitch = false;
                this.requestProfilerHook.startOperation(RequestProfiler.OperationName.RESPONSE_LATENCY, requestUniqueId);
                this.streamConnection.send(originalRequestBuilder.build());
            }
        }
        this.cleanupConnectionAndRequests(false);
    }

    private void cleanupConnectionAndRequests(boolean avoidBlocking) {
        log.info("Cleanup starts. Stream: " + this.streamName + " id: " + this.writerId + " userClose: " + this.userClosed + " final exception: " + (this.connectionFinalStatus == null ? "null" : this.connectionFinalStatus.toString()));
        if (this.streamConnection != null) {
            this.streamConnection.close();
            if (!avoidBlocking) {
                this.waitForDoneCallback(3L, TimeUnit.MINUTES);
            }
        }
        log.info("Stream connection is fully closed. Cleaning up inflight requests. Stream: " + this.streamName + " id: " + this.writerId);
        this.cleanupInflightRequests();
        log.info("Append thread is done. Stream: " + this.streamName + " id: " + this.writerId);
    }

    private void throwIfWaitCallbackTooLong(Instant timeToCheck) {
        Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now());
        if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) {
            throw new Exceptions.MaximumRequestCallbackWaitTimeExceededException(milliSinceLastCallback, this.writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME);
        }
    }

    private boolean waitingQueueDrained() {
        this.lock.lock();
        try {
            boolean bl = (this.userClosed || this.connectionFinalStatus != null) && this.waitingRequestQueue.isEmpty();
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
        log.fine("Waiting for done callback from stream connection. Stream: " + this.streamName + " id: " + this.writerId);
        long deadline = System.nanoTime() + timeUnit.toNanos(duration);
        while (System.nanoTime() <= deadline) {
            this.lock.lock();
            try {
                if (!this.streamConnectionIsConnected) {
                    return;
                }
            }
            finally {
                this.lock.unlock();
            }
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        }
        this.lock.lock();
        try {
            log.warning("Donecallback is not triggered within timeout frame for writer " + this.writerId);
            if (this.connectionFinalStatus == null) {
                this.connectionFinalStatus = new StatusRuntimeException(Status.fromCode(Status.Code.CANCELLED).withDescription("Timeout waiting for DoneCallback."));
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void cleanupInflightRequests() {
        Throwable finalStatus = new Exceptions.StreamWriterClosedException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Connection is already closed, cleanup inflight request"), this.streamName, this.writerId);
        LinkedList<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
        this.lock.lock();
        try {
            if (this.connectionFinalStatus != null) {
                finalStatus = this.connectionFinalStatus;
            }
            while (!this.inflightRequestQueue.isEmpty()) {
                localQueue.addLast(this.pollFirstInflightRequestQueue());
            }
            this.inflightCleanuped = true;
        }
        finally {
            this.lock.unlock();
        }
        log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus + " for Stream " + this.streamName + " id: " + this.writerId);
        int sizeOfQueue = localQueue.size();
        for (int i = 0; i < sizeOfQueue; ++i) {
            if (i == 0) {
                ((AppendRequestAndResponse)localQueue.pollFirst()).appendResult.setException(finalStatus);
                continue;
            }
            ((AppendRequestAndResponse)localQueue.pollFirst()).appendResult.setException(new Exceptions.StreamWriterClosedException(Status.fromCode(Status.Code.ABORTED).withDescription("Connection is aborted due to an unrecoverable failure of another request sharing the connection. Please retry this request."), this.streamName, this.writerId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Boolean retryOnRetryableError(Status.Code errorCode, AppendRequestAndResponse requestWrapper) {
        if (this.retrySettings == null) {
            return false;
        }
        if (this.retrySettings.getMaxAttempts() == 0) {
            return false;
        }
        if (!this.isConnectionErrorRetriable(errorCode) && errorCode != Status.Code.RESOURCE_EXHAUSTED) {
            return false;
        }
        if (requestWrapper.retryCount < this.retrySettings.getMaxAttempts()) {
            this.lock.lock();
            try {
                AppendRequestAndResponse appendRequestAndResponse = requestWrapper;
                Constable constable = appendRequestAndResponse.retryCount;
                appendRequestAndResponse.retryCount = appendRequestAndResponse.retryCount + 1;
                if (this.retrySettings != null && this.useBackoffForError(errorCode, this.streamName)) {
                    requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings == null ? requestWrapper.retryAlgorithm.createFirstAttempt() : requestWrapper.attemptSettings);
                    requestWrapper.blockMessageSendDeadline = Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis());
                    log.info("Messages blocked for retry for " + Duration.between(Instant.now(), requestWrapper.blockMessageSendDeadline) + " until " + requestWrapper.blockMessageSendDeadline);
                }
                Long offset = requestWrapper.message.hasOffset() ? requestWrapper.message.getOffset().getValue() : -1L;
                if (ConnectionWorker.isDefaultStreamName(this.streamName).booleanValue() || offset == -1L) {
                    log.info(String.format("Retrying default stream message in stream %s for in-stream error: %s, retry count: %s", new Object[]{this.streamName, errorCode, requestWrapper.retryCount}));
                    this.addMessageToFrontOfWaitingQueue(requestWrapper);
                } else {
                    log.info(String.format("Retrying exclusive message in stream %s at offset %d for in-stream error: %s, retry count: %s", new Object[]{this.streamName, requestWrapper.message.getOffset().getValue(), errorCode, requestWrapper.retryCount}));
                    while (!this.inflightRequestQueue.isEmpty()) {
                        AppendRequestAndResponse element = this.pollLastInflightRequestQueue();
                        this.addMessageToFrontOfWaitingQueue(element);
                        ++this.responsesToIgnore;
                    }
                    this.addMessageToFrontOfWaitingQueue(requestWrapper);
                }
                constable = Boolean.valueOf(true);
                return constable;
            }
            finally {
                this.lock.unlock();
            }
        }
        log.info(String.format("Max retry count reached for message in stream %s at offset %d.  Retry count: %d", this.streamName, requestWrapper.message.getOffset().getValue(), requestWrapper.retryCount));
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestCallback(AppendRowsResponse response) {
        AppendRequestAndResponse requestWrapper;
        block16: {
            if (response.hasUpdatedSchema()) {
                AppendRowsResponse responseWithUpdatedSchemaRemoved = response.toBuilder().clearUpdatedSchema().build();
                log.fine(String.format("Got response with schema updated (omitting updated schema in response here): %s writer id %s", responseWithUpdatedSchemaRemoved.toString(), this.writerId));
            }
            this.lock.lock();
            try {
                if (this.responsesToIgnore > 0) {
                    if (response.hasError()) {
                        log.fine(String.format("Ignoring response in stream %s at offset %s.", this.streamName, response));
                    } else {
                        log.warning(String.format("Unexpected successful response in stream %s at offset %s.  Due to a previous retryable error being inflight, this message is being ignored.", this.streamName, response.getAppendResult().getOffset()));
                    }
                    --this.responsesToIgnore;
                    return;
                }
                if (response.hasUpdatedSchema()) {
                    this.updatedSchema = TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema());
                }
                if (this.conectionRetryCountWithoutCallback != 0L) {
                    this.conectionRetryCountWithoutCallback = 0L;
                }
                if (this.connectionRetryStartTime != 0L) {
                    this.connectionRetryStartTime = 0L;
                }
                if (!this.inflightRequestQueue.isEmpty()) {
                    Instant sendInstant = this.inflightRequestQueue.getFirst().requestSendTimeStamp;
                    if (sendInstant != null) {
                        Duration durationLatency = Duration.between(sendInstant, Instant.now());
                        this.telemetryMetrics.recordNetworkLatency(durationLatency);
                    }
                    requestWrapper = this.pollFirstInflightRequestQueue();
                    this.requestProfilerHook.endOperation(RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
                    break block16;
                }
                if (this.inflightCleanuped) {
                    return;
                }
                log.log(Level.WARNING, "Unexpected: request callback called on an empty inflight queue.");
                this.connectionFinalStatus = new StatusRuntimeException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Request callback called on an empty inflight queue."));
                return;
            }
            finally {
                this.lock.unlock();
            }
        }
        this.telemetryMetrics.recordResponse(requestWrapper.messageSize, requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount(), Status.Code.values()[response.hasError() ? response.getError().getCode() : Status.Code.OK.ordinal()].toString(), requestWrapper.retryCount > 0);
        if (response.hasError() && this.retryOnRetryableError(Status.Code.values()[response.getError().getCode()], requestWrapper).booleanValue()) {
            log.info("Attempting to retry on error: " + response.getError().toString());
            return;
        }
        this.threadPool.submit(() -> {
            try {
                if (response.hasError()) {
                    Exceptions.StorageException storageException = Exceptions.toStorageException(response.getError(), null);
                    log.fine(String.format("Got error message: %s", response.toString()));
                    if (storageException != null) {
                        requestWrapper.appendResult.setException(storageException);
                    } else if (response.getRowErrorsCount() > 0) {
                        HashMap<Integer, String> rowIndexToErrorMessage = new HashMap<Integer, String>();
                        for (int i = 0; i < response.getRowErrorsCount(); ++i) {
                            RowError rowError = response.getRowErrors(i);
                            rowIndexToErrorMessage.put(Math.toIntExact(rowError.getIndex()), rowError.getMessage());
                        }
                        Exceptions.AppendSerializationError exception = new Exceptions.AppendSerializationError(response.getError().getCode(), response.getError().getMessage(), this.streamName, rowIndexToErrorMessage);
                        requestWrapper.appendResult.setException(exception);
                    } else {
                        StatusRuntimeException exception = new StatusRuntimeException(Status.fromCodeValue(response.getError().getCode()).withDescription(response.getError().getMessage()));
                        requestWrapper.appendResult.setException(exception);
                    }
                } else {
                    requestWrapper.appendResult.set(response);
                }
            }
            finally {
                this.requestProfilerHook.endOperation(RequestProfiler.OperationName.TOTAL_LATENCY, requestWrapper.requestUniqueId);
            }
        });
    }

    private boolean isConnectionErrorRetriable(Status.Code statusCode) {
        return statusCode == Status.Code.ABORTED || statusCode == Status.Code.UNAVAILABLE || statusCode == Status.Code.CANCELLED || statusCode == Status.Code.INTERNAL || statusCode == Status.Code.DEADLINE_EXCEEDED;
    }

    private boolean useBackoffForError(Status.Code statusCode, String streamName) {
        if (ConnectionWorker.isDefaultStreamName(streamName).booleanValue() && statusCode == Status.Code.INTERNAL) {
            return true;
        }
        return statusCode == Status.Code.RESOURCE_EXHAUSTED;
    }

    private void doneCallback(Throwable finalStatus) {
        log.info("Received done callback. Stream: " + this.streamName + " worker id: " + this.writerId + " Final status: " + finalStatus.toString());
        this.lock.lock();
        try {
            this.streamConnectionIsConnected = false;
            this.telemetryMetrics.recordConnectionEnd(Status.Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString());
            if (this.connectionFinalStatus == null) {
                if (this.connectionRetryStartTime == 0L) {
                    this.connectionRetryStartTime = System.currentTimeMillis();
                }
                if (this.isConnectionErrorRetriable(Status.fromThrowable(finalStatus).getCode()) && !this.userClosed && ((float)this.maxRetryDuration.toMillis() == 0.0f || System.currentTimeMillis() - this.connectionRetryStartTime <= this.maxRetryDuration.toMillis())) {
                    ++this.conectionRetryCountWithoutCallback;
                    this.telemetryMetrics.recordConnectionStartWithRetry();
                    log.info("Connection is going to be reestablished with the next request. Retriable error " + finalStatus.toString() + " received, retry count " + this.conectionRetryCountWithoutCallback + ", millis left to retry " + (this.maxRetryDuration.toMillis() - (System.currentTimeMillis() - this.connectionRetryStartTime)) + ", for stream " + this.streamName + " id:" + this.writerId);
                } else {
                    Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
                    this.connectionFinalStatus = storageException != null ? storageException : finalStatus;
                    log.info("Connection finished with error " + finalStatus.toString() + " for stream " + this.streamName + " with write id: " + this.writerId);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @GuardedBy(value="lock")
    private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) {
        AppendRequestAndResponse requestWrapper = pollLast ? this.inflightRequestQueue.pollLast() : this.inflightRequestQueue.poll();
        requestWrapper.requestSendTimeStamp = null;
        --this.inflightRequests;
        this.inflightBytes -= requestWrapper.messageSize;
        this.inflightReduced.signal();
        return requestWrapper;
    }

    @GuardedBy(value="lock")
    private AppendRequestAndResponse pollLastInflightRequestQueue() {
        return this.pollInflightRequestQueue(true);
    }

    @GuardedBy(value="lock")
    private AppendRequestAndResponse pollFirstInflightRequestQueue() {
        return this.pollInflightRequestQueue(false);
    }

    synchronized TableSchemaAndTimestamp getUpdatedSchema() {
        return this.updatedSchema;
    }

    public Load getLoad() {
        return Load.create(this.inflightBytes, this.inflightRequests, this.destinationSet.size(), this.maxInflightBytes, this.maxInflightRequests);
    }

    @VisibleForTesting
    static void setMaxInflightQueueWaitTime(long waitTime) {
        INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime;
    }

    @VisibleForTesting
    static void setMaxInflightRequestWaitTime(Duration waitTime) {
        MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime;
    }

    static final class AppendRequestAndResponse {
        final SettableApiFuture<AppendRowsResponse> appendResult = SettableApiFuture.create();
        final AppendRowsRequest message;
        final long messageSize;
        Instant blockMessageSendDeadline;
        Integer retryCount;
        String requestUniqueId;
        ExponentialRetryAlgorithm retryAlgorithm;
        final StreamWriter streamWriter;
        TimedAttemptSettings attemptSettings;
        Instant requestSendTimeStamp;

        AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter, RetrySettings retrySettings, String requestUniqueId) {
            this.message = message;
            this.messageSize = message.getProtoRows().getSerializedSize();
            this.streamWriter = streamWriter;
            this.requestUniqueId = requestUniqueId;
            this.blockMessageSendDeadline = Instant.now();
            this.retryCount = 0;
            this.attemptSettings = null;
            this.retryAlgorithm = retrySettings != null ? new ExponentialRetryAlgorithm(retrySettings, NanoClock.getDefaultClock()) : null;
        }

        void setRequestSendQueueTime() {
            this.requestSendTimeStamp = Instant.now();
        }
    }

    @AutoValue
    static abstract class TableSchemaAndTimestamp {
        TableSchemaAndTimestamp() {
        }

        abstract long updateTimeStamp();

        abstract TableSchema updatedSchema();

        static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedSchema) {
            return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema);
        }
    }

    @AutoValue
    public static abstract class Load {
        private static double overwhelmedInflightCount = 0.2;
        private static double overwhelmedInflightBytes = 0.2;
        public static final Comparator<Load> LOAD_COMPARATOR = Comparator.comparing(key -> (int)(key.inFlightRequestsBytes() / 1024L)).thenComparing(key -> (int)(key.inFlightRequestsCount() / 100L)).thenComparing(Load::destinationCount);
        public static final Comparator<Load> TEST_LOAD_COMPARATOR = Comparator.comparing(key -> (int)key.inFlightRequestsBytes()).thenComparing(key -> (int)key.inFlightRequestsCount()).thenComparing(Load::destinationCount);

        abstract long inFlightRequestsBytes();

        abstract long inFlightRequestsCount();

        abstract long destinationCount();

        abstract long maxInflightBytes();

        abstract long maxInflightCount();

        static Load create(long inFlightRequestsBytes, long inFlightRequestsCount, long destinationCount, long maxInflightBytes, long maxInflightCount) {
            return new AutoValue_ConnectionWorker_Load(inFlightRequestsBytes, inFlightRequestsCount, destinationCount, maxInflightBytes, maxInflightCount);
        }

        boolean isOverwhelmed() {
            return (double)this.inFlightRequestsCount() > overwhelmedInflightCount * (double)this.maxInflightCount() || (double)this.inFlightRequestsBytes() > overwhelmedInflightBytes * (double)this.maxInflightBytes();
        }

        @VisibleForTesting
        public static void setOverwhelmedBytesThreshold(double newThreshold) {
            overwhelmedInflightBytes = newThreshold;
        }

        @VisibleForTesting
        public static void setOverwhelmedCountsThreshold(double newThreshold) {
            overwhelmedInflightCount = newThreshold;
        }
    }
}

