/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberImpl;
import com.google.cloud.pubsublite.internal.wire.FlowControlBatcher;
import com.google.cloud.pubsublite.internal.wire.NextOffsetTracker;
import com.google.cloud.pubsublite.internal.wire.Predicates;
import com.google.cloud.pubsublite.internal.wire.ResetSignal;
import com.google.cloud.pubsublite.internal.wire.RetryingConnection;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionImpl;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;

public class SubscriberImpl
extends ProxyService
implements Subscriber,
RetryingConnectionObserver<ConnectedSubscriber.Response> {
    @VisibleForTesting
    static final long FLOW_REQUESTS_FLUSH_INTERVAL_MS = 100L;
    private final Consumer<ImmutableList<SequencedMessage>> messageConsumer;
    private final SubscriberResetHandler resetHandler;
    private final InitialSubscribeRequest baseInitialRequest;
    private final CloseableMonitor monitor = new CloseableMonitor();
    private Future<?> alarmFuture;
    @GuardedBy(value="monitor.monitor")
    private final RetryingConnection<SubscribeRequest, ConnectedSubscriber> connection;
    @GuardedBy(value="monitor.monitor")
    private final NextOffsetTracker nextOffsetTracker = new NextOffsetTracker();
    @GuardedBy(value="monitor.monitor")
    private final FlowControlBatcher flowControlBatcher = new FlowControlBatcher();
    @GuardedBy(value="monitor.monitor")
    private Optional<InFlightSeek> inFlightSeek = Optional.empty();
    @GuardedBy(value="monitor.monitor")
    private SeekRequest initialLocation;
    @GuardedBy(value="monitor.monitor")
    private boolean shutdown = false;

    @VisibleForTesting
    SubscriberImpl(StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory, ConnectedSubscriberFactory factory, InitialSubscribeRequest baseInitialRequest, SeekRequest initialLocation, Consumer<ImmutableList<SequencedMessage>> messageConsumer, SubscriberResetHandler resetHandler) throws ApiException {
        this.messageConsumer = messageConsumer;
        this.resetHandler = resetHandler;
        this.baseInitialRequest = baseInitialRequest;
        this.initialLocation = initialLocation;
        this.connection = new RetryingConnectionImpl<SubscribeRequest, SubscribeResponse, ConnectedSubscriber.Response, ConnectedSubscriber>(streamFactory, factory, this, this.getInitialRequest());
        this.addServices(this.connection);
    }

    public SubscriberImpl(SubscriberServiceClient client, InitialSubscribeRequest baseInitialRequest, SeekRequest initialLocation, Consumer<ImmutableList<SequencedMessage>> messageConsumer, SubscriberResetHandler resetHandler) throws ApiException {
        this(stream -> client.subscribeCallable().splitCall(stream), new ConnectedSubscriberImpl.Factory(), baseInitialRequest, initialLocation, messageConsumer, resetHandler);
        this.addServices(ApiServiceUtils.backgroundResourceAsApiService(client));
    }

    @Override
    protected void handlePermanentError(CheckedApiException error) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            this.inFlightSeek.ifPresent(inFlight -> inFlight.seekFuture.setException((Throwable)error));
            this.inFlightSeek = Optional.empty();
            this.onPermanentError(error);
        }
    }

    @Override
    protected void start() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.alarmFuture = SystemExecutors.getAlarmExecutor().scheduleWithFixedDelay(this::processBatchFlowRequest, 100L, 100L, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    protected void stop() {
        this.alarmFuture.cancel(false);
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            this.inFlightSeek.ifPresent(inFlight -> inFlight.seekFuture.setException((Throwable)new CheckedApiException("Client stopped while seek in flight.", StatusCode.Code.ABORTED)));
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public ApiFuture<Offset> seek(SeekRequest request) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            CheckedApiPreconditions.checkArgument(Predicates.isValidSeekRequest(request), "Sent SeekRequest with no location set.");
            CheckedApiPreconditions.checkState(!this.shutdown, "Seeked after the stream shut down.");
            CheckedApiPreconditions.checkState(!this.inFlightSeek.isPresent(), "Seeked while seek is already in flight.");
            SettableApiFuture future = SettableApiFuture.create();
            this.inFlightSeek = Optional.of(new InFlightSeek(request, (SettableApiFuture<Offset>)future));
            this.connection.modifyConnection(connectedSubscriber -> connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request)));
            SettableApiFuture settableApiFuture = future;
            return settableApiFuture;
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
            return ApiFutures.immediateFailedFuture((Throwable)e);
        }
    }

    @Override
    public boolean seekInFlight() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            boolean bl = this.inFlightSeek.isPresent();
            return bl;
        }
    }

    @Override
    public void allowFlow(FlowControlRequest clientRequest) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.flowControlBatcher.onClientFlowRequest(clientRequest);
            if (this.flowControlBatcher.shouldExpediteBatchRequest()) {
                this.connection.modifyConnection(connectedSubscriber -> connectedSubscriber.ifPresent(subscriber -> this.flushBatchFlowRequest((ConnectedSubscriber)subscriber)));
            }
        }
    }

    private SubscribeRequest getInitialRequest() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder().setInitial(this.baseInitialRequest.toBuilder().setInitialLocation(this.nextOffsetTracker.requestForRestart().orElse(this.initialLocation))).build();
            return subscribeRequest;
        }
    }

    public void reset() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.inFlightSeek.ifPresent(inFlight -> inFlight.seekFuture.setException((Throwable)new CheckedApiException("Aborted due to out of band seek.", StatusCode.Code.ABORTED)));
            this.inFlightSeek = Optional.empty();
            this.nextOffsetTracker.reset();
            this.initialLocation = SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.COMMITTED_CURSOR).build();
        }
    }

    @Override
    public void triggerReinitialize(CheckedApiException streamError) {
        if (ResetSignal.isResetSignal(streamError)) {
            try {
                if (this.resetHandler.handleReset()) {
                    this.reset();
                }
            }
            catch (CheckedApiException e) {
                this.onPermanentError(e);
                return;
            }
        }
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.connection.reinitialize(this.getInitialRequest());
            this.connection.modifyConnection(connectedSubscriber -> {
                CheckedApiPreconditions.checkArgument(this.monitor.monitor.isOccupiedByCurrentThread());
                CheckedApiPreconditions.checkArgument(connectedSubscriber.isPresent());
                if (this.inFlightSeek.isPresent()) {
                    ((ConnectedSubscriber)connectedSubscriber.get()).seek(this.inFlightSeek.get().seekRequest);
                } else {
                    this.flowControlBatcher.requestForRestart().ifPresent(request -> ((ConnectedSubscriber)connectedSubscriber.get()).allowFlow((FlowControlRequest)request));
                }
            });
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
        }
    }

    @Override
    public void onClientResponse(ConnectedSubscriber.Response value) throws CheckedApiException {
        switch (value.getKind()) {
            case MESSAGES: {
                this.onMessageResponse(value.messages());
                return;
            }
            case SEEK_OFFSET: {
                this.onSeekResponse(value.seekOffset());
                return;
            }
        }
        throw new CheckedApiException("Invalid switch case: " + (Object)((Object)value.getKind()), StatusCode.Code.FAILED_PRECONDITION);
    }

    private void onMessageResponse(ImmutableList<SequencedMessage> messages) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.nextOffsetTracker.onMessages((Collection<SequencedMessage>)messages);
            this.flowControlBatcher.onMessages((Collection<SequencedMessage>)messages);
        }
        this.messageConsumer.accept(messages);
    }

    private void onSeekResponse(Offset seekOffset) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            CheckedApiPreconditions.checkState(this.inFlightSeek.isPresent(), "No in flight seek, but received a seek response.");
            this.nextOffsetTracker.onClientSeek(seekOffset);
            this.flowControlBatcher.onClientSeek();
            this.inFlightSeek.get().seekFuture.set((Object)seekOffset);
            this.inFlightSeek = Optional.empty();
        }
    }

    @VisibleForTesting
    void processBatchFlowRequest() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.connection.modifyConnection(connectedSubscriber -> connectedSubscriber.ifPresent(subscriber -> this.flushBatchFlowRequest((ConnectedSubscriber)subscriber)));
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
        }
    }

    private void flushBatchFlowRequest(ConnectedSubscriber subscriber) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.flowControlBatcher.releasePendingRequest().ifPresent(request -> subscriber.allowFlow((FlowControlRequest)request));
        }
    }

    private static class InFlightSeek {
        final SeekRequest seekRequest;
        final SettableApiFuture<Offset> seekFuture;

        InFlightSeek(SeekRequest request, SettableApiFuture<Offset> future) {
            this.seekRequest = request;
            this.seekFuture = future;
        }
    }
}

