/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamReader;
import io.nats.client.JetStreamStatusException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.NatsSystemClock;
import io.nats.client.PullRequestOptions;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.impl.MessageManager;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsJetStream;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.NatsSubscription;
import io.nats.client.support.NatsJetStreamConstants;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;

public class NatsJetStreamSubscription
extends NatsSubscription
implements JetStreamSubscription,
NatsJetStreamConstants {
    public static final String SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL = "Subscription type does not support pull.";
    public static final long EXPIRE_ADJUSTMENT = 10L;
    public static final long MIN_EXPIRE_MILLIS = 20L;
    protected final NatsJetStream js;
    protected String stream;
    protected String consumerName;
    protected MessageManager manager;

    NatsJetStreamSubscription(String sid, String subject, String queueName, NatsConnection connection, NatsDispatcher dispatcher, NatsJetStream js, String stream, String consumer, MessageManager manager) {
        super(sid, subject, queueName, connection, dispatcher);
        this.js = js;
        this.stream = stream;
        this.consumerName = consumer;
        this.manager = manager;
        manager.startup(this);
    }

    void setConsumerName(String consumerName) {
        this.consumerName = consumerName;
    }

    @Override
    public String getConsumerName() {
        return this.consumerName;
    }

    @Override
    public String getStreamName() {
        return this.stream;
    }

    boolean isPullMode() {
        return false;
    }

    MessageManager getManager() {
        return this.manager;
    }

    @Override
    void invalidate() {
        this.manager.shutdown();
        super.invalidate();
    }

    @Override
    public Message nextMessage(Duration timeout) throws InterruptedException, IllegalStateException {
        if (timeout == null) {
            return this._nextUnmanagedNoWait(null);
        }
        long timeoutNanos = timeout.toNanos();
        if (timeoutNanos <= 0L) {
            return this._nextUnmanagedWaitForever(null);
        }
        return this._nextUnmanaged(timeoutNanos, null);
    }

    @Override
    public Message nextMessage(long timeoutMillis) throws InterruptedException, IllegalStateException {
        if (timeoutMillis <= 0L) {
            return this._nextUnmanagedWaitForever(null);
        }
        return this._nextUnmanaged(timeoutMillis * 1000000L, null);
    }

    protected Message _nextUnmanagedWaitForever(String expectedPullSubject) throws InterruptedException {
        block4: while (true) {
            NatsMessage msg;
            if ((msg = this.nextMessageInternal(Duration.ZERO)) == null) {
                continue;
            }
            switch (this.manager.manage(msg)) {
                case MESSAGE: {
                    return msg;
                }
                case STATUS_ERROR: {
                    if (expectedPullSubject != null && !expectedPullSubject.equals(msg.getSubject())) continue block4;
                    throw new JetStreamStatusException(msg.getStatus(), this);
                }
            }
        }
    }

    protected Message _nextUnmanagedNoWait(String expectedPullSubject) throws InterruptedException {
        NatsMessage msg;
        block5: while ((msg = this.nextMessageInternal(null)) != null) {
            switch (this.manager.manage(msg)) {
                case MESSAGE: {
                    return msg;
                }
                case STATUS_TERMINUS: {
                    if (expectedPullSubject != null && !expectedPullSubject.equals(msg.getSubject())) continue block5;
                    return null;
                }
                case STATUS_ERROR: {
                    if (expectedPullSubject != null && !expectedPullSubject.equals(msg.getSubject())) continue block5;
                    throw new JetStreamStatusException(msg.getStatus(), this);
                }
            }
        }
        return null;
    }

    protected Message _nextUnmanaged(long timeoutNanos, String expectedPullSubject) throws InterruptedException {
        long timeLeftNanos = timeoutNanos;
        long start = NatsSystemClock.nanoTime();
        while (timeLeftNanos > 0L) {
            NatsMessage msg = this.nextMessageInternal(Duration.ofNanos(timeLeftNanos));
            if (msg == null) {
                return null;
            }
            switch (this.manager.manage(msg)) {
                case MESSAGE: {
                    return msg;
                }
                case STATUS_TERMINUS: {
                    if (expectedPullSubject != null && !expectedPullSubject.equals(msg.getSubject())) break;
                    return null;
                }
                case STATUS_ERROR: {
                    if (expectedPullSubject != null && !expectedPullSubject.equals(msg.getSubject())) break;
                    throw new JetStreamStatusException(msg.getStatus(), this);
                }
            }
            timeLeftNanos = timeoutNanos - (NatsSystemClock.nanoTime() - start);
        }
        return null;
    }

    @Override
    public void pull(int batchSize) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public void pull(PullRequestOptions pullRequestOptions) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public void pullNoWait(int batchSize) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public void pullNoWait(int batchSize, Duration expiresIn) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public void pullNoWait(int batchSize, long expiresInMillis) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public void pullExpiresIn(int batchSize, Duration expiresIn) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public void pullExpiresIn(int batchSize, long expiresInMillis) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public List<Message> fetch(int batchSize, long maxWaitMillis) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public List<Message> fetch(int batchSize, Duration maxWait) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public Iterator<Message> iterate(int batchSize, Duration maxWait) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public Iterator<Message> iterate(int batchSize, long maxWaitMillis) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public JetStreamReader reader(int batchSize, int repullAt) {
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
    }

    @Override
    public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
        return this.js.lookupConsumerInfo(this.stream, this.consumerName);
    }

    public String toString() {
        return "NatsJetStreamSubscription{consumer='" + this.consumerName + '\'' + ", stream='" + this.stream + '\'' + ", deliver='" + this.getSubject() + '\'' + ", isPullMode=" + this.isPullMode() + '}';
    }
}

