/*
 * Decompiled with CFR 0.152.
 */
package com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.scanner;

import com.google.bigtable.repackaged.com.google.api.client.util.BackOff;
import com.google.bigtable.repackaged.com.google.api.client.util.Clock;
import com.google.bigtable.repackaged.com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.repackaged.com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.config.RetryOptions;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.async.AbstractRetryingOperation;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.async.BigtableAsyncRpc;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.scanner.BigtableRetriesExhaustedException;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.scanner.FlatRow;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.scanner.ReadRowsRequestManager;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.scanner.RowMerger;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.scanner.ScanHandler;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.scanner.ScanTimeoutException;
import com.google.bigtable.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.bigtable.repackaged.com.google.protobuf.ByteString;
import com.google.bigtable.repackaged.io.grpc.CallOptions;
import com.google.bigtable.repackaged.io.grpc.ClientCall;
import com.google.bigtable.repackaged.io.grpc.Metadata;
import com.google.bigtable.repackaged.io.grpc.Status;
import com.google.bigtable.repackaged.io.grpc.stub.ClientCallStreamObserver;
import com.google.bigtable.repackaged.io.grpc.stub.ClientResponseObserver;
import com.google.bigtable.repackaged.io.grpc.stub.StreamObserver;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
public class RetryingReadRowsOperation
extends AbstractRetryingOperation<ReadRowsRequest, ReadRowsResponse, String>
implements ScanHandler {
    private static final String TIMEOUT_CANCEL_MSG = "Client side timeout induced cancellation";
    @VisibleForTesting
    Clock clock = Clock.SYSTEM;
    private final ReadRowsRequestManager requestManager;
    private final StreamObserver<FlatRow> rowObserver;
    private RowMerger rowMerger;
    private long lastResponseMs;
    private int timeoutRetryCount = 0;
    private CallToStreamObserverAdapter<ReadRowsRequest> adapter;
    private StreamObserver<ReadRowsResponse> resultObserver;

    public RetryingReadRowsOperation(StreamObserver<FlatRow> observer, RetryOptions retryOptions, ReadRowsRequest request, BigtableAsyncRpc<ReadRowsRequest, ReadRowsResponse> retryableRpc, CallOptions callOptions, ScheduledExecutorService retryExecutorService, Metadata originalMetadata) {
        super(retryOptions, request, retryableRpc, callOptions, retryExecutorService, originalMetadata);
        this.rowObserver = observer;
        this.requestManager = new ReadRowsRequestManager(request);
    }

    public void setResultObserver(StreamObserver<ReadRowsResponse> resultObserver) {
        this.resultObserver = resultObserver;
    }

    @Override
    protected ReadRowsRequest getRetryRequest() {
        return this.requestManager.buildUpdatedRequest();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.lastResponseMs = this.clock.currentTimeMillis();
        this.rowMerger = new RowMerger(this.rowObserver);
        Object object = this.callLock;
        synchronized (object) {
            super.run();
            this.adapter = new CallToStreamObserverAdapter(this.call);
            this.adapter.request(1);
            if (this.rowObserver instanceof ClientResponseObserver) {
                ((ClientResponseObserver)this.rowObserver).beforeStart(this.adapter);
            }
        }
    }

    @Override
    public void onMessage(ReadRowsResponse message) {
        this.resetStatusBasedBackoff();
        this.lastResponseMs = this.clock.currentTimeMillis();
        this.timeoutRetryCount = 0;
        ByteString previouslyProcessedKey = this.rowMerger.getLastCompletedRowKey();
        this.rowMerger.onNext(message);
        ByteString lastProcessedKey = this.rowMerger.getLastCompletedRowKey();
        if (previouslyProcessedKey != lastProcessedKey) {
            this.updateLastFoundKey(lastProcessedKey);
        } else {
            this.updateLastFoundKey(message.getLastScannedRowKey());
        }
        if (((CallToStreamObserverAdapter)this.adapter).autoFlowControlEnabled) {
            this.adapter.request(1);
        }
        if (this.resultObserver != null) {
            this.resultObserver.onNext(message);
        }
    }

    private void updateLastFoundKey(ByteString lastProcessedKey) {
        if (lastProcessedKey != null && !lastProcessedKey.isEmpty()) {
            this.requestManager.updateLastFoundKey(lastProcessedKey);
        }
    }

    @Override
    public void onClose(Status status, Metadata trailers) {
        if (status.getCode() == Status.Code.CANCELLED && status.getDescription().contains(TIMEOUT_CANCEL_MSG)) {
            return;
        }
        super.onClose(status, trailers);
    }

    @Override
    public void setException(Exception exception) {
        this.rowObserver.onError(exception);
        this.rowMerger = new RowMerger(this.rowObserver);
    }

    @Override
    protected boolean isRequestRetryable() {
        return true;
    }

    @Override
    protected boolean onOK(Metadata trailers) {
        this.rowMerger.onCompleted();
        this.completionFuture.set("");
        return true;
    }

    void resetStatusBasedBackoff() {
        this.currentBackoff = null;
        this.failedCount = 0;
        this.lastResponseMs = this.clock.currentTimeMillis();
    }

    @Override
    public void handleTimeout(ScanTimeoutException rte) throws BigtableRetriesExhaustedException {
        if (this.clock.currentTimeMillis() - this.lastResponseMs >= (long)this.retryOptions.getReadPartialRowTimeoutMillis()) {
            this.retryOnTimeout(rte);
        }
    }

    private void retryOnTimeout(ScanTimeoutException rte) throws BigtableRetriesExhaustedException {
        LOG.info("The client could not get a response in %d ms. Retrying the scan.", this.retryOptions.getReadPartialRowTimeoutMillis());
        this.cancel(TIMEOUT_CANCEL_MSG);
        this.rpcTimerContext.close();
        ++this.failedCount;
        int maxRetries = this.retryOptions.getMaxScanTimeoutRetries();
        if (!this.retryOptions.enableRetries() || ++this.timeoutRetryCount > maxRetries) {
            throw this.getExhaustedRetriesException(Status.ABORTED);
        }
        this.rpc.getRpcMetrics().markRetry();
        this.resetStatusBasedBackoff();
        this.run();
    }

    @VisibleForTesting
    int getTimeoutRetryCount() {
        return this.timeoutRetryCount;
    }

    @VisibleForTesting
    BackOff getCurrentBackoff() {
        return this.currentBackoff;
    }

    @VisibleForTesting
    RowMerger getRowMerger() {
        return this.rowMerger;
    }

    private static class CallToStreamObserverAdapter<T>
    extends ClientCallStreamObserver<T> {
        private final ClientCall<T, ?> call;
        private boolean autoFlowControlEnabled = true;

        public CallToStreamObserverAdapter(ClientCall<T, ?> call) {
            this.call = call;
        }

        @Override
        public void onNext(T value) {
            this.call.sendMessage(value);
        }

        @Override
        public void onError(Throwable t) {
            this.call.cancel("Cancelled by client with StreamObserver.onError()", t);
        }

        @Override
        public void onCompleted() {
            this.call.halfClose();
        }

        @Override
        public boolean isReady() {
            return this.call.isReady();
        }

        @Override
        public void setOnReadyHandler(Runnable onReadyHandler) {
        }

        @Override
        public void disableAutoInboundFlowControl() {
            this.autoFlowControlEnabled = false;
        }

        @Override
        public void request(int count) {
            this.call.request(count);
        }

        @Override
        public void setMessageCompression(boolean enable) {
            this.call.setMessageCompression(enable);
        }
    }
}

