/*
 * Decompiled with CFR 0.152.
 */
package com.eventstore.dbclient;

import com.eventstore.dbclient.GrpcClient;
import com.eventstore.dbclient.GrpcUtils;
import com.eventstore.dbclient.NotLeaderException;
import com.eventstore.dbclient.OptionsBase;
import com.eventstore.dbclient.ReadMessage;
import com.eventstore.dbclient.ReadSubscription;
import com.eventstore.dbclient.StreamNotFoundException;
import com.eventstore.dbclient.proto.shared.Shared;
import com.eventstore.dbclient.proto.streams.StreamsGrpc;
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

abstract class AbstractRead
implements Publisher<ReadMessage> {
    protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions = StreamsOuterClass.ReadReq.Options.newBuilder().setUuidOption(StreamsOuterClass.ReadReq.Options.UUIDOption.newBuilder().setStructured(Shared.Empty.getDefaultInstance()));
    private final GrpcClient client;
    private final OptionsBase<?> options;

    protected AbstractRead(GrpcClient client, OptionsBase<?> options) {
        this.client = client;
        this.options = options;
    }

    public abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();

    public void subscribe(Subscriber<? super ReadMessage> subscriber) {
        final ReadSubscription readSubscription = new ReadSubscription(subscriber);
        subscriber.onSubscribe((Subscription)readSubscription);
        final CompletableFuture result = new CompletableFuture();
        this.client.run(channel -> {
            StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder().setOptions(this.createOptions()).build();
            StreamsGrpc.StreamsStub client = GrpcUtils.configureStub(StreamsGrpc.newStub((Channel)channel), this.client.getSettings(), this.options);
            client.read(request, (StreamObserver<StreamsOuterClass.ReadResp>)new ClientResponseObserver<StreamsOuterClass.ReadReq, StreamsOuterClass.ReadResp>(){
                private boolean completed = false;

                public void beforeStart(ClientCallStreamObserver<StreamsOuterClass.ReadReq> requestStream) {
                    readSubscription.setStreamObserver(requestStream);
                }

                public void onNext(StreamsOuterClass.ReadResp value) {
                    if (this.completed) {
                        return;
                    }
                    if (value.hasStreamNotFound()) {
                        StreamNotFoundException streamNotFoundException = new StreamNotFoundException();
                        this.handleError(streamNotFoundException);
                        return;
                    }
                    try {
                        readSubscription.onNext(new ReadMessage(value));
                    }
                    catch (Throwable t) {
                        this.handleError(t);
                    }
                }

                public void onCompleted() {
                    if (this.completed) {
                        return;
                    }
                    this.completed = true;
                    result.complete(readSubscription);
                    readSubscription.onCompleted();
                }

                public void onError(Throwable t) {
                    if (this.completed) {
                        return;
                    }
                    if (t instanceof StatusRuntimeException) {
                        StatusRuntimeException e = (StatusRuntimeException)t;
                        String leaderHost = (String)e.getTrailers().get(Metadata.Key.of((String)"leader-endpoint-host", (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER));
                        String leaderPort = (String)e.getTrailers().get(Metadata.Key.of((String)"leader-endpoint-port", (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER));
                        if (leaderHost != null && leaderPort != null) {
                            NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
                            this.handleError(reason);
                            return;
                        }
                    }
                    this.handleError(t);
                }

                private void handleError(Throwable t) {
                    this.completed = true;
                    result.completeExceptionally(t);
                    readSubscription.onError(t);
                }
            });
            return result;
        }).exceptionally(t -> {
            readSubscription.onError((Throwable)t);
            return readSubscription;
        });
    }
}

