/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.ConsumeOptions;
import io.nats.client.ConsumerContext;
import io.nats.client.Dispatcher;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.IterableConsumer;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamStatusCheckedException;
import io.nats.client.Message;
import io.nats.client.MessageConsumer;
import io.nats.client.MessageHandler;
import io.nats.client.PullRequestOptions;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.OrderedConsumerConfiguration;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsFetchConsumer;
import io.nats.client.impl.NatsIterableConsumer;
import io.nats.client.impl.NatsJetStreamPullSubscription;
import io.nats.client.impl.NatsMessageConsumer;
import io.nats.client.impl.NatsMessageConsumerBase;
import io.nats.client.impl.NatsStreamContext;
import io.nats.client.impl.PullMessageManager;
import io.nats.client.impl.SimplifiedSubscriptionMaker;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

public class NatsConsumerContext
implements ConsumerContext,
SimplifiedSubscriptionMaker {
    private final ReentrantLock stateLock = new ReentrantLock();
    private final NatsStreamContext streamCtx;
    private final boolean ordered;
    private final ConsumerConfiguration initialOrderedConsumerConfig;
    private final PullSubscribeOptions unorderedBindPso;
    private final AtomicReference<ConsumerInfo> cachedConsumerInfo;
    private final AtomicReference<String> consumerName;
    private final AtomicLong highestSeq;
    private final AtomicReference<Dispatcher> defaultDispatcher;
    private final AtomicReference<NatsMessageConsumerBase> lastConsumer;

    NatsConsumerContext(NatsStreamContext sc, ConsumerInfo unorderedConsumerInfo, OrderedConsumerConfiguration occ) {
        this.streamCtx = sc;
        this.cachedConsumerInfo = new AtomicReference();
        this.consumerName = new AtomicReference();
        this.highestSeq = new AtomicLong();
        this.defaultDispatcher = new AtomicReference();
        this.lastConsumer = new AtomicReference();
        if (unorderedConsumerInfo != null) {
            this.ordered = false;
            this.initialOrderedConsumerConfig = null;
            this.cachedConsumerInfo.set(unorderedConsumerInfo);
            this.consumerName.set(unorderedConsumerInfo.getName());
            this.unorderedBindPso = PullSubscribeOptions.fastBind(sc.streamName, unorderedConsumerInfo.getName());
        } else {
            this.ordered = true;
            this.initialOrderedConsumerConfig = ConsumerConfiguration.builder().name(occ.getConsumerNamePrefix()).filterSubjects(occ.getFilterSubjects()).deliverPolicy(occ.getDeliverPolicy()).startSequence(occ.getStartSequence()).startTime(occ.getStartTime()).replayPolicy(occ.getReplayPolicy()).headersOnly(occ.getHeadersOnly()).build();
            this.unorderedBindPso = null;
        }
    }

    @Override
    public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Dispatcher userDispatcher, PullMessageManager optionalPmm, Long optionalInactiveThreshold) throws IOException, JetStreamApiException {
        NatsJetStreamPullSubscription sub;
        PullSubscribeOptions pso;
        if (this.ordered) {
            NatsMessageConsumerBase lastCon = this.lastConsumer.get();
            if (lastCon != null) {
                this.highestSeq.set(Math.max(this.highestSeq.get(), lastCon.pmm.lastStreamSeq));
            }
            ConsumerConfiguration cc = this.streamCtx.js.consumerConfigurationForOrdered(this.initialOrderedConsumerConfig, this.highestSeq.get(), null, optionalInactiveThreshold).build();
            pso = new OrderedPullSubscribeOptionsBuilder(this.streamCtx.streamName, cc).build();
        } else {
            pso = this.unorderedBindPso;
        }
        if (messageHandler == null) {
            sub = (NatsJetStreamPullSubscription)this.streamCtx.js.createSubscription(null, null, pso, null, null, null, false, optionalPmm);
        } else {
            Dispatcher d = userDispatcher;
            if (d == null && (d = this.defaultDispatcher.get()) == null) {
                d = this.streamCtx.js.conn.createDispatcher();
                this.defaultDispatcher.set(d);
            }
            sub = (NatsJetStreamPullSubscription)this.streamCtx.js.createSubscription(null, null, pso, null, (NatsDispatcher)d, messageHandler, false, optionalPmm);
        }
        this.consumerName.set(sub.getConsumerName());
        return sub;
    }

    private void checkState() throws IOException {
        NatsMessageConsumerBase lastCon = this.lastConsumer.get();
        if (lastCon != null) {
            if (this.ordered && !lastCon.finished.get()) {
                throw new IOException("The ordered consumer is already receiving messages. Ordered Consumer does not allow multiple instances at time.");
            }
            if (lastCon.finished.get() && !lastCon.stopped.get()) {
                lastCon.lenientClose();
            }
        }
    }

    private NatsMessageConsumerBase trackConsume(NatsMessageConsumerBase con) {
        this.lastConsumer.set(con);
        return con;
    }

    @Override
    public String getConsumerName() {
        return this.consumerName.get();
    }

    @Override
    public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
        ConsumerInfo ci = this.streamCtx.jsm.getConsumerInfo(this.streamCtx.streamName, this.consumerName.get());
        this.cachedConsumerInfo.set(ci);
        this.consumerName.set(ci.getName());
        return ci;
    }

    @Override
    public ConsumerInfo getCachedConsumerInfo() {
        return this.cachedConsumerInfo.get();
    }

    @Override
    public Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
        return this.next(30000L);
    }

    @Override
    public Message next(Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
        return maxWait == null ? this.next(30000L) : this.next(maxWait.toMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
        if (maxWaitMillis < 1000L) {
            throw new IllegalArgumentException("Max wait must be at least 1000 milliseconds.");
        }
        NatsMessageConsumerBase nmcb = null;
        try {
            this.stateLock.lock();
            this.checkState();
            try {
                long inactiveThreshold = maxWaitMillis * 110L / 100L;
                nmcb = new NatsMessageConsumerBase(this.cachedConsumerInfo.get());
                nmcb.initSub(this.subscribe(null, null, null, inactiveThreshold));
                nmcb.setConsumerName(this.consumerName.get());
                this.trackConsume(nmcb);
                nmcb.sub._pull(PullRequestOptions.builder(1).expiresIn(maxWaitMillis - 10L).build(), false, null);
            }
            catch (Exception e) {
                if (nmcb != null) {
                    try {
                        nmcb.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                Message message = null;
                this.stateLock.unlock();
                return message;
            }
        }
        finally {
            this.stateLock.unlock();
        }
        try {
            Message message = nmcb.sub.nextMessage(maxWaitMillis);
            return message;
        }
        finally {
            try {
                nmcb.finished.set(true);
                nmcb.close();
            }
            catch (Exception exception) {}
        }
    }

    @Override
    public FetchConsumer fetchMessages(int maxMessages) throws IOException, JetStreamApiException {
        return this.fetch(FetchConsumeOptions.builder().maxMessages(maxMessages).build());
    }

    @Override
    public FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiException {
        return this.fetch(FetchConsumeOptions.builder().maxBytes(maxBytes).build());
    }

    @Override
    public FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException {
        try {
            this.stateLock.lock();
            this.checkState();
            Validator.required(fetchConsumeOptions, "Fetch Consume Options");
            FetchConsumer fetchConsumer = (FetchConsumer)((Object)this.trackConsume(new NatsFetchConsumer(this, this.cachedConsumerInfo.get(), fetchConsumeOptions)));
            return fetchConsumer;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    @Override
    public IterableConsumer iterate() throws IOException, JetStreamApiException {
        return this.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS);
    }

    @Override
    public IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException {
        try {
            this.stateLock.lock();
            this.checkState();
            Validator.required(consumeOptions, "Consume Options");
            IterableConsumer iterableConsumer = (IterableConsumer)((Object)this.trackConsume(new NatsIterableConsumer(this, this.cachedConsumerInfo.get(), consumeOptions)));
            return iterableConsumer;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    @Override
    public MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException {
        return this.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, null, handler);
    }

    @Override
    public MessageConsumer consume(Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException {
        return this.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, dispatcher, handler);
    }

    @Override
    public MessageConsumer consume(ConsumeOptions consumeOptions, MessageHandler handler) throws IOException, JetStreamApiException {
        return this.consume(consumeOptions, null, handler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher userDispatcher, MessageHandler handler) throws IOException, JetStreamApiException {
        try {
            this.stateLock.lock();
            this.checkState();
            Validator.required(handler, "Message Handler");
            Validator.required(consumeOptions, "Consume Options");
            NatsMessageConsumerBase natsMessageConsumerBase = this.trackConsume(new NatsMessageConsumer(this, this.cachedConsumerInfo.get(), consumeOptions, userDispatcher, handler));
            return natsMessageConsumerBase;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    static class OrderedPullSubscribeOptionsBuilder
    extends PullSubscribeOptions.Builder {
        OrderedPullSubscribeOptionsBuilder(String streamName, ConsumerConfiguration cc) {
            this.stream(streamName);
            this.configuration(cc);
            this.ordered = true;
        }
    }
}

