/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.internal.wire.StreamIdleTimer;
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;

public abstract class SingleConnection<StreamRequestT, StreamResponseT, ClientResponseT>
implements ResponseObserver<StreamResponseT>,
AutoCloseable {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    protected static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = Duration.ofMinutes(2L);
    private final ClientStream<StreamRequestT> requestStream;
    private final ResponseObserver<ClientResponseT> clientStream;
    private final StreamIdleTimer streamIdleTimer;
    @GuardedBy(value="this")
    private boolean receivedInitial = false;
    @GuardedBy(value="this")
    private final Queue<StreamRequestT> bufferedBeforeInitial = new ArrayDeque<StreamRequestT>();
    @GuardedBy(value="this")
    private boolean completed = false;

    protected abstract void handleStreamResponse(StreamResponseT var1) throws CheckedApiException;

    protected SingleConnection(StreamFactory<StreamRequestT, StreamResponseT> streamFactory, ResponseObserver<ClientResponseT> clientStream, Duration streamIdleTimeout) {
        this.clientStream = clientStream;
        this.streamIdleTimer = new StreamIdleTimer(streamIdleTimeout, this::onStreamIdle);
        this.requestStream = streamFactory.New(this);
    }

    protected SingleConnection(StreamFactory<StreamRequestT, StreamResponseT> streamFactory, ResponseObserver<ClientResponseT> clientStream) {
        this(streamFactory, clientStream, DEFAULT_STREAM_IDLE_TIMEOUT);
    }

    protected void initialize(StreamRequestT initialRequest) {
        this.requestStream.send(initialRequest);
    }

    protected synchronized void sendToStream(StreamRequestT request) {
        if (this.completed) {
            ((GoogleLogger.Api)log.atFine()).log("Sent request after stream completion: %s", request);
            return;
        }
        if (!this.receivedInitial) {
            this.bufferedBeforeInitial.add(request);
            return;
        }
        this.requestStream.send(request);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendToClient(ClientResponseT response) {
        SingleConnection singleConnection = this;
        synchronized (singleConnection) {
            if (this.completed) {
                ((GoogleLogger.Api)log.atFine()).log("Sent response after stream completion: %s", response);
                return;
            }
            Preconditions.checkState((boolean)this.receivedInitial);
        }
        this.clientStream.onResponse(response);
    }

    protected void setError(CheckedApiException error) {
        this.abort(error);
    }

    private synchronized boolean completeStream() {
        try {
            if (this.completed) {
                return true;
            }
            this.completed = true;
            this.streamIdleTimer.close();
        }
        catch (Exception e) {
            ((GoogleLogger.Api)((GoogleLogger.Api)log.atSevere()).withCause((Throwable)e)).log("Error occurred while shutting down connection.");
        }
        return false;
    }

    @Override
    public void close() {
        if (this.completeStream()) {
            return;
        }
        this.requestStream.closeSend();
        this.clientStream.onComplete();
    }

    private void abort(CheckedApiException error) {
        if (this.completeStream()) {
            return;
        }
        this.requestStream.closeSendWithError((Throwable)error.underlying);
        this.clientStream.onError((Throwable)error);
    }

    public void onStart(StreamController streamController) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onResponse(StreamResponseT response) {
        SingleConnection singleConnection = this;
        synchronized (singleConnection) {
            this.streamIdleTimer.restart();
            if (this.completed) {
                ((GoogleLogger.Api)log.atFine()).log("Received response on stream after completion: %s", response);
                return;
            }
            if (!this.receivedInitial) {
                this.handleInitial();
            }
        }
        try {
            this.handleStreamResponse(response);
        }
        catch (CheckedApiException e) {
            this.abort(e);
        }
    }

    @GuardedBy(value="this")
    private void handleInitial() {
        StreamRequestT req = this.bufferedBeforeInitial.poll();
        while (req != null) {
            this.requestStream.send(req);
            req = this.bufferedBeforeInitial.poll();
        }
        this.receivedInitial = true;
    }

    public void onError(Throwable t) {
        if (this.completeStream()) {
            return;
        }
        this.clientStream.onError(t);
        this.requestStream.closeSendWithError(t);
    }

    public void onComplete() {
        if (this.completeStream()) {
            return;
        }
        this.clientStream.onComplete();
        this.requestStream.closeSend();
    }

    private void onStreamIdle() {
        this.onError(new CheckedApiException("Detected idle stream.", StatusCode.Code.ABORTED));
    }
}

