/*
 * Decompiled with CFR 0.152.
 */
package io.kurrent.dbclient;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import io.kurrent.dbclient.Checkpointer;
import io.kurrent.dbclient.ClientTelemetry;
import io.kurrent.dbclient.GrpcClient;
import io.kurrent.dbclient.GrpcUtils;
import io.kurrent.dbclient.OptionsWithBackPressure;
import io.kurrent.dbclient.ReadResponseObserver;
import io.kurrent.dbclient.Subscription;
import io.kurrent.dbclient.SubscriptionListener;
import io.kurrent.dbclient.SubscriptionStreamConsumer;
import io.kurrent.dbclient.proto.shared.Shared;
import io.kurrent.dbclient.proto.streams.StreamsGrpc;
import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
import java.util.concurrent.CompletableFuture;

abstract class AbstractRegularSubscription {
    protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions = StreamsOuterClass.ReadReq.Options.newBuilder().setUuidOption(StreamsOuterClass.ReadReq.Options.UUIDOption.newBuilder().setStructured(Shared.Empty.getDefaultInstance()));
    protected static final StreamsOuterClass.ReadReq.Options.Builder defaultSubscribeOptions = ((StreamsOuterClass.ReadReq.Options.Builder)defaultReadOptions.clone()).setReadDirection(StreamsOuterClass.ReadReq.Options.ReadDirection.Forwards).setSubscription(StreamsOuterClass.ReadReq.Options.SubscriptionOptions.getDefaultInstance());
    protected SubscriptionListener listener;
    protected Checkpointer checkpointer = null;
    private final GrpcClient client;
    private final OptionsWithBackPressure<?> options;

    protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure<?> options) {
        this.client = client;
        this.options = options;
    }

    protected abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();

    public CompletableFuture<Subscription> execute() {
        return this.client.run(channel -> {
            CompletableFuture<Subscription> future = new CompletableFuture<Subscription>();
            StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder().setOptions(this.createOptions()).build();
            StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(StreamsGrpc.newStub((Channel)channel), this.client.getSettings(), this.options);
            ReadResponseObserver observer = this.createObserver((ManagedChannel)channel, future);
            streamsClient.read(readReq, (StreamObserver<StreamsOuterClass.ReadResp>)observer);
            return future;
        });
    }

    private ReadResponseObserver createObserver(ManagedChannel channel, CompletableFuture<Subscription> future) {
        SubscriptionStreamConsumer consumer = new SubscriptionStreamConsumer(this.listener, this.checkpointer, future, (subscriptionId, event, action) -> ClientTelemetry.traceSubscribe(action, subscriptionId, channel, this.client.getSettings(), this.options.getCredentials(), event));
        return new ReadResponseObserver(this.options, consumer);
    }
}

