/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.Preconditions;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberImpl;
import com.google.cloud.pubsublite.internal.wire.NextOffsetTracker;
import com.google.cloud.pubsublite.internal.wire.Predicates;
import com.google.cloud.pubsublite.internal.wire.RetryingConnection;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionImpl;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.TokenCounter;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;

public class SubscriberImpl
extends ProxyService
implements Subscriber,
RetryingConnectionObserver<ConnectedSubscriber.Response> {
    private final Consumer<ImmutableList<SequencedMessage>> messageConsumer;
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private final RetryingConnection<ConnectedSubscriber> connection;
    @GuardedBy(value="monitor.monitor")
    private final NextOffsetTracker nextOffsetTracker = new NextOffsetTracker();
    @GuardedBy(value="monitor.monitor")
    private final TokenCounter tokenCounter = new TokenCounter();
    @GuardedBy(value="monitor.monitor")
    private Optional<SettableApiFuture<Offset>> inFlightSeek = Optional.empty();
    @GuardedBy(value="monitor.monitor")
    private boolean shutdown = false;

    @VisibleForTesting
    SubscriberImpl(SubscriberServiceGrpc.SubscriberServiceStub stub, ConnectedSubscriberFactory factory, InitialSubscribeRequest initialRequest, Consumer<ImmutableList<SequencedMessage>> messageConsumer) throws StatusException {
        this.messageConsumer = messageConsumer;
        this.connection = new RetryingConnectionImpl<SubscribeRequest, SubscribeResponse, ConnectedSubscriber.Response, ConnectedSubscriber>(arg_0 -> ((SubscriberServiceGrpc.SubscriberServiceStub)stub).subscribe(arg_0), factory, SubscribeRequest.newBuilder().setInitial(initialRequest).build(), this);
        this.addServices(this.connection);
    }

    public SubscriberImpl(SubscriberServiceGrpc.SubscriberServiceStub stub, InitialSubscribeRequest initialRequest, Consumer<ImmutableList<SequencedMessage>> messageConsumer) throws StatusException {
        this(stub, new ConnectedSubscriberImpl.Factory(), initialRequest, messageConsumer);
    }

    @Override
    protected void handlePermanentError(StatusException error) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            this.inFlightSeek.ifPresent(inFlight -> inFlight.setException((Throwable)error));
            this.inFlightSeek = Optional.empty();
            this.onPermanentError(error);
        }
    }

    @Override
    protected void start() {
    }

    @Override
    protected void stop() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            this.inFlightSeek.ifPresent(inFlight -> inFlight.setException((Throwable)Status.ABORTED.withDescription("Client stopped while seek in flight.").asException()));
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public ApiFuture<Offset> seek(SeekRequest request) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            Preconditions.checkArgument(Predicates.isValidSeekRequest(request), "Sent SeekRequest with no location set.");
            Preconditions.checkState(!this.shutdown, "Seeked after the stream shut down.");
            Preconditions.checkState(!this.inFlightSeek.isPresent(), "Seeked while seek is already in flight.");
            SettableApiFuture future = SettableApiFuture.create();
            this.inFlightSeek = Optional.of(future);
            this.connection.modifyConnection(connectedSubscriber -> connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request)));
            SettableApiFuture settableApiFuture = future;
            return settableApiFuture;
        }
        catch (StatusException e) {
            this.onPermanentError(e);
            return ApiFutures.immediateFailedFuture((Throwable)e);
        }
    }

    @Override
    public boolean seekInFlight() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            boolean bl = this.inFlightSeek.isPresent();
            return bl;
        }
    }

    @Override
    public void allowFlow(FlowControlRequest request) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.tokenCounter.onClientFlowRequest(request);
            this.connection.modifyConnection(connectedSubscriber -> connectedSubscriber.ifPresent(subscriber -> subscriber.allowFlow(request)));
        }
        catch (StatusException e) {
            this.onPermanentError(e);
            throw e.getStatus().asRuntimeException();
        }
    }

    @Override
    public void triggerReinitialize() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.connection.reinitialize();
            this.connection.modifyConnection(connectedSubscriber -> {
                Preconditions.checkArgument(this.monitor.monitor.isOccupiedByCurrentThread());
                Preconditions.checkArgument(connectedSubscriber.isPresent());
                this.nextOffsetTracker.requestForRestart().ifPresent(request -> {
                    this.inFlightSeek = Optional.of(SettableApiFuture.create());
                    ((ConnectedSubscriber)connectedSubscriber.get()).seek((SeekRequest)request);
                });
                this.tokenCounter.requestForRestart().ifPresent(request -> ((ConnectedSubscriber)connectedSubscriber.get()).allowFlow((FlowControlRequest)request));
            });
        }
        catch (StatusException e) {
            this.onPermanentError(e);
        }
    }

    @Override
    public Status onClientResponse(ConnectedSubscriber.Response value) {
        switch (value.getKind()) {
            case MESSAGES: {
                return this.onMessageResponse(value.messages());
            }
            case SEEK_OFFSET: {
                return this.onSeekResponse(value.seekOffset());
            }
        }
        return Status.FAILED_PRECONDITION.withDescription("Invalid switch case: " + (Object)((Object)value.getKind()));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Status onMessageResponse(ImmutableList<SequencedMessage> messages) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                Status status = Status.OK;
                return status;
            }
            this.nextOffsetTracker.onMessages((Collection<SequencedMessage>)messages);
            this.tokenCounter.onMessages((Collection<SequencedMessage>)messages);
        }
        catch (StatusException e) {
            this.onPermanentError(e);
            return e.getStatus();
        }
        this.messageConsumer.accept(messages);
        return Status.OK;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Status onSeekResponse(Offset seekOffset) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                Status status2 = Status.OK;
                return status2;
            }
            Preconditions.checkState(this.inFlightSeek.isPresent(), "No in flight seek, but received a seek response.");
            this.nextOffsetTracker.onClientSeek(seekOffset);
            this.inFlightSeek.get().set((Object)seekOffset);
            this.inFlightSeek = Optional.empty();
            Status status = Status.OK;
            return status;
        }
        catch (StatusException e) {
            this.onPermanentError(e);
            return e.getStatus();
        }
    }
}

