/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.MessageHandler;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.Watcher;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsFeatureBase;
import io.nats.client.impl.NatsJetStream;
import java.io.IOException;
import java.util.List;

public class NatsWatchSubscription<T>
implements AutoCloseable {
    private final JetStream js;
    private NatsDispatcher dispatcher;
    private JetStreamSubscription sub;

    public NatsWatchSubscription(JetStream js) {
        this.js = js;
    }

    protected void finishInit(NatsFeatureBase fb, List<String> subscribeSubjects, DeliverPolicy deliverPolicy, boolean headersOnly, long fromRevision, WatchMessageHandler<T> handler, String consumerNamePrefix) throws IOException, JetStreamApiException {
        long pending;
        if (fromRevision > 0L) {
            deliverPolicy = DeliverPolicy.ByStartSequence;
        } else {
            fromRevision = 0L;
            if (deliverPolicy == DeliverPolicy.New) {
                handler.sendEndOfData();
            }
        }
        PushSubscribeOptions pso = ((PushSubscribeOptions.Builder)((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().stream(fb.getStreamName())).ordered(true).configuration(ConsumerConfiguration.builder().name(consumerNamePrefix).ackPolicy(AckPolicy.None).deliverPolicy(deliverPolicy).startSequence(fromRevision).headersOnly(headersOnly).filterSubjects(subscribeSubjects).build())).build();
        this.dispatcher = (NatsDispatcher)((NatsJetStream)this.js).conn.createDispatcher();
        this.sub = this.js.subscribe(null, this.dispatcher, handler, false, pso);
        if (!handler.endOfDataSent && (pending = this.sub.getConsumerInfo().getCalculatedPending()) == 0L) {
            handler.sendEndOfData();
        }
    }

    public void unsubscribe() {
        if (this.dispatcher != null) {
            this.dispatcher.unsubscribe(this.sub);
            if (this.dispatcher.getSubscriptionHandlers().size() == 0) {
                this.dispatcher.connection.closeDispatcher(this.dispatcher);
                this.dispatcher = null;
            }
        }
    }

    @Override
    public void close() throws Exception {
        this.unsubscribe();
    }

    protected static abstract class WatchMessageHandler<T>
    implements MessageHandler {
        private final Watcher<T> watcher;
        boolean endOfDataSent;

        protected WatchMessageHandler(Watcher<T> watcher) {
            this.watcher = watcher;
        }

        public void sendEndOfData() {
            this.endOfDataSent = true;
            this.watcher.endOfData();
        }
    }
}

