/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.KeyValueOperation;
import io.nats.client.api.KeyValueWatchOption;
import io.nats.client.api.KeyValueWatcher;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsJetStream;
import io.nats.client.impl.NatsKeyValue;
import java.io.IOException;

public class NatsKeyValueWatchSubscription
implements AutoCloseable {
    private static final Object dispatcherLock = new Object();
    private static NatsDispatcher dispatcher;
    private final JetStreamSubscription sub;

    public NatsKeyValueWatchSubscription(NatsKeyValue kv, String keyPattern, KeyValueWatcher watcher, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException {
        long pending;
        String keySubject = kv.rawKeySubject(keyPattern);
        boolean headersOnly = false;
        boolean ignoreDeletes = false;
        DeliverPolicy deliverPolicy = DeliverPolicy.LastPerSubject;
        block6: for (KeyValueWatchOption wo : watchOptions) {
            if (wo == null) continue;
            switch (wo) {
                case META_ONLY: {
                    headersOnly = true;
                    continue block6;
                }
                case IGNORE_DELETE: {
                    ignoreDeletes = true;
                    continue block6;
                }
                case UPDATES_ONLY: {
                    deliverPolicy = DeliverPolicy.New;
                    continue block6;
                }
                case INCLUDE_HISTORY: {
                    deliverPolicy = DeliverPolicy.All;
                }
            }
        }
        WatchMessageHandler handler = new WatchMessageHandler(watcher, !ignoreDeletes);
        if (deliverPolicy == DeliverPolicy.New) {
            handler.sendEndOfData();
        } else {
            KeyValueEntry kveCheckPending = kv._kvGetLastMessage(keyPattern);
            if (kveCheckPending == null) {
                handler.sendEndOfData();
            }
        }
        PushSubscribeOptions pso = ((PushSubscribeOptions.Builder)((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().stream(kv.getStreamName())).ordered(true).configuration(ConsumerConfiguration.builder().ackPolicy(AckPolicy.None).deliverPolicy(deliverPolicy).headersOnly(headersOnly).filterSubject(keySubject).build())).build();
        this.sub = kv.js.subscribe(keySubject, NatsKeyValueWatchSubscription.getDispatcher(kv.js), handler, false, pso);
        if (!handler.endOfDataSent && (pending = this.sub.getConsumerInfo().getCalculatedPending()) == 0L) {
            handler.sendEndOfData();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Dispatcher getDispatcher(JetStream js) {
        Object object = dispatcherLock;
        synchronized (object) {
            if (dispatcher == null) {
                dispatcher = (NatsDispatcher)((NatsJetStream)js).conn.createDispatcher();
            }
            return dispatcher;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unsubscribe() {
        Object object = dispatcherLock;
        synchronized (object) {
            dispatcher.unsubscribe(this.sub);
            if (dispatcher.getSubscriptionHandlers().size() == 0) {
                NatsKeyValueWatchSubscription.dispatcher.connection.closeDispatcher(dispatcher);
                dispatcher = null;
            }
        }
    }

    @Override
    public void close() throws Exception {
        this.unsubscribe();
    }

    static class WatchMessageHandler
    implements MessageHandler {
        private final KeyValueWatcher watcher;
        private final boolean includeDeletes;
        boolean endOfDataSent;

        public WatchMessageHandler(KeyValueWatcher watcher, boolean includeDeletes) {
            this.watcher = watcher;
            this.includeDeletes = includeDeletes;
        }

        @Override
        public void onMessage(Message m) throws InterruptedException {
            KeyValueEntry kve = new KeyValueEntry(m);
            if (this.includeDeletes || kve.getOperation() == KeyValueOperation.PUT) {
                this.watcher.watch(kve);
            }
            if (!this.endOfDataSent && kve.getDelta() == 0L) {
                this.sendEndOfData();
            }
        }

        private void sendEndOfData() {
            this.endOfDataSent = true;
            this.watcher.endOfData();
        }
    }
}

