/*
 * Decompiled with CFR 0.152.
 */
package com.eventstore.dbclient;

import com.eventstore.dbclient.NotLeaderException;
import com.eventstore.dbclient.OptionsWithBackPressure;
import com.eventstore.dbclient.ResolvedEvent;
import com.eventstore.dbclient.StreamConsumer;
import com.eventstore.dbclient.StreamNotFoundException;
import com.eventstore.dbclient.WorkItemArgs;
import com.eventstore.dbclient.proto.shared.Shared;
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ReadResponseObserver
implements ClientResponseObserver<StreamsOuterClass.ReadReq, StreamsOuterClass.ReadResp> {
    private static final Logger logger = LoggerFactory.getLogger(ReadResponseObserver.class);
    private final OptionsWithBackPressure<?> options;
    private final AtomicInteger requested = new AtomicInteger(0);
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final StreamConsumer consumer;
    private ClientCallStreamObserver<StreamsOuterClass.ReadReq> requestStream;
    private int outstandingRequests;
    private WorkItemArgs args;

    public ReadResponseObserver(OptionsWithBackPressure<?> options, StreamConsumer consumer) {
        this.options = options;
        this.consumer = consumer;
    }

    public Subscription getSubscription() {
        return new InternalSubscription(this);
    }

    public void onConnected(WorkItemArgs args) {
        this.args = args;
    }

    void cancel(String reason, Throwable cause) {
        if (!this.completed.compareAndSet(false, true)) {
            return;
        }
        if (this.requestStream == null) {
            return;
        }
        this.requestStream.cancel(reason, cause);
        if (cause instanceof StreamNotFoundException) {
            this.consumer.onStreamNotFound(((StreamNotFoundException)cause).getStreamName());
        }
    }

    void manageFlowControl() {
        int requestedCount = this.requested.getAndSet(0);
        int bufferRequestSize = this.options.computeRequestThreshold();
        this.outstandingRequests = Math.max(this.outstandingRequests, 0);
        int received = this.options.getBatchSize() - this.outstandingRequests;
        if (requestedCount > 0) {
            int needed = Math.min(requestedCount, received);
            this.requestStream.request(needed);
            this.outstandingRequests += needed;
            return;
        }
        if (received < bufferRequestSize) {
            return;
        }
        this.requestStream.request(received);
        this.outstandingRequests += received;
    }

    public void beforeStart(ClientCallStreamObserver<StreamsOuterClass.ReadReq> requestStream) {
        this.requestStream = requestStream;
        if (this.completed.get()) {
            this.requestStream.cancel("the streaming operation was cancelled manually", null);
            return;
        }
        this.requestStream.disableAutoRequestWithInitial(this.options.getBatchSize());
        this.outstandingRequests = this.options.getBatchSize();
        this.consumer.onSubscribe(this.getSubscription());
    }

    public void onNext(StreamsOuterClass.ReadResp value) {
        if (this.completed.get()) {
            return;
        }
        --this.outstandingRequests;
        if (value.hasStreamNotFound()) {
            String streamName = value.getStreamNotFound().getStreamIdentifier().getStreamName().toString(Charset.defaultCharset());
            this.cancel(String.format("stream '%s' is not found", streamName), new StreamNotFoundException(streamName));
            return;
        }
        if (value.hasEvent()) {
            this.consumer.onEvent(ResolvedEvent.fromWire(value.getEvent()));
        } else if (value.hasConfirmation()) {
            this.consumer.onSubscriptionConfirmation(value.getConfirmation().getSubscriptionId());
        } else if (value.hasCheckpoint()) {
            StreamsOuterClass.ReadResp.Checkpoint checkpoint = value.getCheckpoint();
            this.consumer.onCheckpoint(checkpoint.getCommitPosition(), checkpoint.getPreparePosition());
        } else if (value.hasFirstStreamPosition()) {
            this.consumer.onFirstStreamPosition(value.getFirstStreamPosition());
        } else if (value.hasLastStreamPosition()) {
            this.consumer.onLastStreamPosition(value.getLastStreamPosition());
        } else if (value.hasLastAllStreamPosition()) {
            Shared.AllStreamPosition position = value.getLastAllStreamPosition();
            this.consumer.onLastAllStreamPosition(position.getCommitPosition(), position.getPreparePosition());
        } else if (value.hasCaughtUp()) {
            this.consumer.onCaughtUp();
        } else if (value.hasFellBehind()) {
            this.consumer.onFellBehind();
        } else {
            logger.warn("received unknown message variant");
        }
        this.manageFlowControl();
    }

    public void onError(Throwable t) {
        if (!this.completed.compareAndSet(false, true)) {
            return;
        }
        if (t instanceof StatusRuntimeException) {
            StatusRuntimeException e = (StatusRuntimeException)t;
            if (e.getStatus().getCode() == Status.Code.CANCELLED) {
                return;
            }
            Metadata trailers = e.getTrailers();
            if (trailers != null) {
                String leaderHost = (String)trailers.get(Metadata.Key.of((String)"leader-endpoint-host", (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER));
                String leaderPort = (String)trailers.get(Metadata.Key.of((String)"leader-endpoint-port", (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER));
                if (leaderHost != null && leaderPort != null) {
                    int port = Integer.parseInt(leaderPort);
                    this.args.reportNewLeader(leaderHost, port);
                    t = new NotLeaderException(leaderHost, port);
                }
            }
        }
        this.consumer.onCancelled(t);
    }

    public void onCompleted() {
        if (!this.completed.compareAndSet(false, true)) {
            return;
        }
        this.consumer.onComplete();
    }

    static class InternalSubscription
    implements Subscription {
        private final ReadResponseObserver observer;

        InternalSubscription(ReadResponseObserver observer) {
            this.observer = observer;
        }

        public void request(long n) {
            if (n <= 0L) {
                throw new IllegalArgumentException("non-positive subscription request");
            }
            this.observer.requested.set((int)n);
        }

        public void cancel() {
            this.observer.cancel("subscription was manually cancelled", null);
        }
    }
}

