/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.AbstractApiService;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsublite.ErrorCodes;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.RetryingConnection;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver;
import com.google.cloud.pubsublite.internal.wire.SingleConnectionFactory;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;

class RetryingConnectionImpl<StreamRequestT, StreamResponseT, ClientResponseT, ConnectionT extends AutoCloseable>
extends AbstractApiService
implements RetryingConnection<StreamRequestT, ConnectionT>,
ResponseObserver<ClientResponseT> {
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private static final Duration INITIAL_RECONNECT_BACKOFF_TIME = Duration.ofMillis(10L);
    private static final Duration MAX_RECONNECT_BACKOFF_TIME = Duration.ofSeconds(10L);
    private final StreamFactory<StreamRequestT, StreamResponseT> streamFactory;
    private final SingleConnectionFactory<StreamRequestT, StreamResponseT, ClientResponseT, ConnectionT> connectionFactory;
    private final RetryingConnectionObserver<ClientResponseT> observer;
    private final CloseableMonitor connectionMonitor = new CloseableMonitor();
    @GuardedBy(value="connectionMonitor.monitor")
    private long nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis();
    @GuardedBy(value="connectionMonitor.monitor")
    private StreamRequestT lastInitialRequest;
    @GuardedBy(value="connectionMonitor.monitor")
    private ConnectionT currentConnection;
    @GuardedBy(value="connectionMonitor.monitor")
    private boolean completed = false;

    RetryingConnectionImpl(StreamFactory<StreamRequestT, StreamResponseT> streamFactory, SingleConnectionFactory<StreamRequestT, StreamResponseT, ClientResponseT, ConnectionT> connectionFactory, RetryingConnectionObserver<ClientResponseT> observer, StreamRequestT initialRequest) {
        this.streamFactory = streamFactory;
        this.connectionFactory = connectionFactory;
        this.observer = observer;
        this.lastInitialRequest = initialRequest;
    }

    protected void doStart() {
        StreamRequestT initialInitialRequest;
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            initialInitialRequest = this.lastInitialRequest;
        }
        SystemExecutors.getFuturesExecutor().execute(() -> {
            this.reinitialize(initialInitialRequest);
            this.notifyStarted();
        });
    }

    @Override
    public void reinitialize(StreamRequestT initialRequest) {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.completed) {
                return;
            }
            this.lastInitialRequest = initialRequest;
            ((GoogleLogger.Api)logger.atFiner()).log("Start initializing connection for %s", (Object)this.streamDescription());
            this.currentConnection = this.connectionFactory.New(this.streamFactory, this, this.lastInitialRequest);
            ((GoogleLogger.Api)logger.atFiner()).log("Initialized connection for %s", (Object)this.streamDescription());
        }
    }

    protected void doStop() {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.completed) {
                return;
            }
            this.completed = true;
            ((GoogleLogger.Api)logger.atFine()).log("Terminating connection for %s", (Object)this.streamDescription());
            if (this.currentConnection != null) {
                this.currentConnection.close();
            }
        }
        catch (Throwable t) {
            ((GoogleLogger.Api)((GoogleLogger.Api)logger.atInfo()).withCause(t)).log("Failed while terminating connection for %s", (Object)this.streamDescription());
            this.notifyFailed(t);
            return;
        }
        ((GoogleLogger.Api)logger.atFine()).log("Terminated connection for %s", (Object)this.streamDescription());
        this.notifyStopped();
    }

    @Override
    public void modifyConnection(RetryingConnection.Modifier<ConnectionT> modifier) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.completed) {
                modifier.modify(Optional.empty());
            } else {
                modifier.modify(Optional.of(this.currentConnection));
            }
        }
    }

    void setPermanentError(Throwable error) {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.completed) {
                return;
            }
            this.completed = true;
        }
        this.notifyFailed(error);
    }

    public void onStart(StreamController controller) {
        controller.disableAutoInboundFlowControl();
        controller.request(Integer.MAX_VALUE);
    }

    public final void onResponse(ClientResponseT value) {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.completed) {
                return;
            }
            this.nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis();
        }
        try {
            this.observer.onClientResponse(value);
        }
        catch (Throwable t) {
            this.setPermanentError(t);
        }
    }

    public final void onError(Throwable t) {
        Optional<CheckedApiException> statusOr = ExtractStatus.extract(t);
        if (!statusOr.isPresent()) {
            this.setPermanentError(t);
            return;
        }
        if (!ErrorCodes.IsRetryableForStreams(statusOr.get().code())) {
            this.setPermanentError(statusOr.get());
            return;
        }
        Optional<Object> throwable = Optional.empty();
        long backoffTime = 0L;
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.currentConnection != null) {
                this.currentConnection.close();
            }
            backoffTime = this.nextRetryBackoffDuration;
            this.nextRetryBackoffDuration = Math.min(backoffTime * 2L, MAX_RECONNECT_BACKOFF_TIME.toMillis());
        }
        catch (Throwable t2) {
            throwable = Optional.of(t2);
        }
        if (throwable.isPresent()) {
            this.setPermanentError(new CheckedApiException("Failed to close preexisting stream after error.", (Throwable)throwable.get(), StatusCode.Code.FAILED_PRECONDITION));
            return;
        }
        ((GoogleLogger.Api)((GoogleLogger.Api)logger.atFine()).withCause(t)).log("Stream disconnected attempting retry, after %s milliseconds for %s", backoffTime, (Object)this.streamDescription());
        ScheduledFuture<?> unusedFuture = SystemExecutors.getAlarmExecutor().schedule(() -> this.triggerReinitialize((CheckedApiException)statusOr.get()), backoffTime, TimeUnit.MILLISECONDS);
    }

    private void triggerReinitialize(CheckedApiException streamError) {
        SystemExecutors.getFuturesExecutor().execute(() -> {
            try {
                this.observer.triggerReinitialize(streamError);
            }
            catch (Throwable t) {
                ((GoogleLogger.Api)((GoogleLogger.Api)logger.atInfo()).withCause(t)).log("Error occurred in triggerReinitialize.");
                this.onError(t);
            }
        });
    }

    public final void onComplete() {
        boolean expectedCompletion;
        ((GoogleLogger.Api)logger.atFine()).log("Stream completed for %s", (Object)this.streamDescription());
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            expectedCompletion = this.completed;
        }
        if (!expectedCompletion) {
            this.setPermanentError(new CheckedApiException("Server unexpectedly closed stream.", StatusCode.Code.FAILED_PRECONDITION));
        }
    }

    private String streamDescription() {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            String string = this.lastInitialRequest.getClass().getSimpleName() + ": " + this.lastInitialRequest.toString();
            return string;
        }
    }
}

