/*
 * Decompiled with CFR 0.152.
 */
package io.kurrent.dbclient;

import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import io.kurrent.dbclient.GrpcClient;
import io.kurrent.dbclient.GrpcUtils;
import io.kurrent.dbclient.OptionsWithBackPressure;
import io.kurrent.dbclient.ReadMessage;
import io.kurrent.dbclient.ReadResponseObserver;
import io.kurrent.dbclient.ReadStreamConsumer;
import io.kurrent.dbclient.WorkItemArgs;
import io.kurrent.dbclient.proto.shared.Shared;
import io.kurrent.dbclient.proto.streams.StreamsGrpc;
import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

abstract class AbstractRead
implements Publisher<ReadMessage> {
    protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions = StreamsOuterClass.ReadReq.Options.newBuilder().setUuidOption(StreamsOuterClass.ReadReq.Options.UUIDOption.newBuilder().setStructured(Shared.Empty.getDefaultInstance()));
    private final GrpcClient client;
    private final OptionsWithBackPressure<?> options;

    protected AbstractRead(GrpcClient client, OptionsWithBackPressure<?> options) {
        this.client = client;
        this.options = options;
    }

    public abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();

    public void subscribe(Subscriber<? super ReadMessage> subscriber) {
        ReadResponseObserver observer = new ReadResponseObserver(this.options, new ReadStreamConsumer(subscriber));
        this.client.getWorkItemArgs().whenComplete((args, error) -> {
            if (error != null) {
                observer.onError((Throwable)error);
                return;
            }
            StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder().setOptions(this.createOptions()).build();
            StreamsGrpc.StreamsStub client = GrpcUtils.configureStub(StreamsGrpc.newStub((Channel)args.getChannel()), this.client.getSettings(), this.options);
            observer.onConnected((WorkItemArgs)args);
            subscriber.onSubscribe(observer.getSubscription());
            client.read(request, (StreamObserver<StreamsOuterClass.ReadResp>)observer);
        });
    }
}

