/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamOptions;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.PublishOptions;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.SubscribeOptions;
import io.nats.client.api.AccountStatistics;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerCreateRequest;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.api.PurgeResponse;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.SuccessApiResponse;
import io.nats.client.impl.ConsumerListReader;
import io.nats.client.impl.ConsumerNamesReader;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsJetStreamSubscription;
import io.nats.client.impl.StreamListReader;
import io.nats.client.impl.StreamNamesReader;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.NatsJetStreamConstants;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class NatsJetStream
implements JetStream,
JetStreamManagement,
NatsJetStreamConstants {
    private final NatsConnection conn;
    private final String prefix;
    private final Duration requestTimeout;

    NatsJetStream(NatsConnection connection, JetStreamOptions jsOptions) throws IOException {
        this.conn = connection;
        if (jsOptions == null) {
            this.prefix = JetStreamOptions.DEFAULT_JS_OPTIONS.getPrefix();
            this.requestTimeout = JetStreamOptions.DEFAULT_JS_OPTIONS.getRequestTimeout();
        } else {
            this.prefix = jsOptions.getPrefix();
            this.requestTimeout = jsOptions.getRequestTimeout();
        }
        this.checkEnabled();
    }

    private void checkEnabled() throws IOException {
        try {
            Message respMessage = this.makeRequest("INFO", null, this.requestTimeout);
            if (respMessage == null) {
                throw new IllegalStateException("JetStream is not enabled.");
            }
            AccountStatistics stats = new AccountStatistics(respMessage);
            if (stats.getErrorCode() == 503L) {
                throw new IllegalStateException(stats.getDescription());
            }
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    @Override
    public StreamInfo addStream(StreamConfiguration config) throws IOException, JetStreamApiException {
        return this.addOrUpdateStream(config, "STREAM.CREATE.%s");
    }

    @Override
    public StreamInfo updateStream(StreamConfiguration config) throws IOException, JetStreamApiException {
        return this.addOrUpdateStream(config, "STREAM.UPDATE.%s");
    }

    private StreamInfo addOrUpdateStream(StreamConfiguration config, String template) throws IOException, JetStreamApiException {
        if (config == null) {
            throw new IllegalArgumentException("Configuration cannot be null.");
        }
        String streamName = config.getName();
        if (Validator.nullOrEmpty(streamName)) {
            throw new IllegalArgumentException("Configuration must have a valid stream name");
        }
        String subj = String.format(template, streamName);
        Message resp = this.makeRequestResponseRequired(subj, config.toJson().getBytes(), this.requestTimeout);
        return (StreamInfo)new StreamInfo(resp).throwOnHasError();
    }

    @Override
    public boolean deleteStream(String streamName) throws IOException, JetStreamApiException {
        String subj = String.format("STREAM.DELETE.%s", streamName);
        Message resp = this.makeRequestResponseRequired(subj, null, this.requestTimeout);
        return ((SuccessApiResponse)new SuccessApiResponse(resp).throwOnHasError()).getSuccess();
    }

    @Override
    public StreamInfo getStreamInfo(String streamName) throws IOException, JetStreamApiException {
        String subj = String.format("STREAM.INFO.%s", streamName);
        Message resp = this.makeRequestResponseRequired(subj, null, this.requestTimeout);
        return (StreamInfo)new StreamInfo(resp).throwOnHasError();
    }

    @Override
    public PurgeResponse purgeStream(String streamName) throws IOException, JetStreamApiException {
        String subj = String.format("STREAM.PURGE.%s", streamName);
        Message resp = this.makeRequestResponseRequired(subj, null, this.requestTimeout);
        return (PurgeResponse)new PurgeResponse(resp).throwOnHasError();
    }

    @Override
    public ConsumerInfo addOrUpdateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
        Validator.validateStreamNameRequired(streamName);
        Validator.validateNotNull(config, "Config");
        Validator.validateNotNull(config.getDurable(), "Durable");
        return this.addOrUpdateConsumerInternal(streamName, config);
    }

    private ConsumerInfo addOrUpdateConsumerInternal(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
        String durable = config.getDurable();
        String requestJSON = new ConsumerCreateRequest(streamName, config).toJson();
        String subj = durable == null ? String.format("CONSUMER.CREATE.%s", streamName) : String.format("CONSUMER.DURABLE.CREATE.%s.%s", streamName, durable);
        Message resp = this.makeRequestResponseRequired(subj, requestJSON.getBytes(), this.conn.getOptions().getConnectionTimeout());
        return (ConsumerInfo)new ConsumerInfo(resp).throwOnHasError();
    }

    @Override
    public boolean deleteConsumer(String streamName, String consumer) throws IOException, JetStreamApiException {
        String subj = String.format("CONSUMER.DELETE.%s.%s", streamName, consumer);
        Message resp = this.makeRequestResponseRequired(subj, null, this.requestTimeout);
        return ((SuccessApiResponse)new SuccessApiResponse(resp).throwOnHasError()).getSuccess();
    }

    @Override
    public ConsumerInfo getConsumerInfo(String streamName, String consumer) throws IOException, JetStreamApiException {
        String subj = String.format("CONSUMER.INFO.%s.%s", streamName, consumer);
        Message resp = this.makeRequestResponseRequired(subj, null, this.requestTimeout);
        return (ConsumerInfo)new ConsumerInfo(resp).throwOnHasError();
    }

    @Override
    public List<String> getConsumerNames(String streamName) throws IOException, JetStreamApiException {
        return this.getConsumerNames(streamName, null);
    }

    private List<String> getConsumerNames(String streamName, String filter) throws IOException, JetStreamApiException {
        String subj = String.format("CONSUMER.NAMES.%s", streamName);
        ConsumerNamesReader cnr = new ConsumerNamesReader();
        while (cnr.hasMore()) {
            Message resp = this.makeRequestResponseRequired(subj, cnr.nextJson(filter), this.requestTimeout);
            cnr.process(resp);
        }
        return cnr.getStrings();
    }

    @Override
    public List<ConsumerInfo> getConsumers(String streamName) throws IOException, JetStreamApiException {
        String subj = String.format("CONSUMER.LIST.%s", streamName);
        ConsumerListReader clg = new ConsumerListReader();
        while (clg.hasMore()) {
            Message resp = this.makeRequestResponseRequired(subj, clg.nextJson(), this.requestTimeout);
            clg.process(resp);
        }
        return clg.getConsumers();
    }

    @Override
    public List<String> getStreamNames() throws IOException, JetStreamApiException {
        StreamNamesReader snr = new StreamNamesReader();
        while (snr.hasMore()) {
            Message resp = this.makeRequestResponseRequired("STREAM.NAMES", snr.nextJson(), this.requestTimeout);
            snr.process(resp);
        }
        return snr.getStrings();
    }

    @Override
    public List<StreamInfo> getStreams() throws IOException, JetStreamApiException {
        StreamListReader slg = new StreamListReader();
        while (slg.hasMore()) {
            Message resp = this.makeRequestResponseRequired("STREAM.LIST", slg.nextJson(), this.requestTimeout);
            slg.process(resp);
        }
        return slg.getStreams();
    }

    @Override
    public MessageInfo getMessage(String streamName, long seq) throws IOException, JetStreamApiException {
        String subj = String.format("STREAM.MSG.GET.%s", streamName);
        Message resp = this.makeRequestResponseRequired(subj, JsonUtils.simpleMessageBody("seq", seq), this.requestTimeout);
        return (MessageInfo)new MessageInfo(resp).throwOnHasError();
    }

    @Override
    public boolean deleteMessage(String streamName, long seq) throws IOException, JetStreamApiException {
        String subj = String.format("STREAM.MSG.DELETE.%s", streamName);
        Message resp = this.makeRequestResponseRequired(subj, JsonUtils.simpleMessageBody("seq", seq), this.requestTimeout);
        return ((SuccessApiResponse)new SuccessApiResponse(resp).throwOnHasError()).getSuccess();
    }

    @Override
    public PublishAck publish(String subject, byte[] body) throws IOException, JetStreamApiException {
        return this.publishSync(subject, null, body, this.conn.getOptions().supportUTF8Subjects(), null);
    }

    @Override
    public PublishAck publish(String subject, byte[] body, PublishOptions options) throws IOException, JetStreamApiException {
        return this.publishSync(subject, null, body, this.conn.getOptions().supportUTF8Subjects(), options);
    }

    @Override
    public PublishAck publish(Message message) throws IOException, JetStreamApiException {
        Validator.validateNotNull(message, "Message");
        return this.publishSync(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), null);
    }

    @Override
    public PublishAck publish(Message message, PublishOptions options) throws IOException, JetStreamApiException {
        Validator.validateNotNull(message, "Message");
        return this.publishSync(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), options);
    }

    @Override
    public CompletableFuture<PublishAck> publishAsync(String subject, byte[] body) {
        return this.publishAsync(subject, null, body, this.conn.getOptions().supportUTF8Subjects(), null);
    }

    @Override
    public CompletableFuture<PublishAck> publishAsync(String subject, byte[] body, PublishOptions options) {
        return this.publishAsync(subject, null, body, this.conn.getOptions().supportUTF8Subjects(), options);
    }

    @Override
    public CompletableFuture<PublishAck> publishAsync(Message message) {
        Validator.validateNotNull(message, "Message");
        return this.publishAsync(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), null);
    }

    @Override
    public CompletableFuture<PublishAck> publishAsync(Message message, PublishOptions options) {
        Validator.validateNotNull(message, "Message");
        return this.publishAsync(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), options);
    }

    private CompletableFuture<PublishAck> publishAsync(String subject, Headers headers, byte[] data, boolean utf8mode, PublishOptions options) {
        Headers merged = this.mergePublishOptions(headers, options);
        CompletableFuture<Message> future = this.conn.request(subject, merged, data, utf8mode, false);
        return future.thenCompose(resp -> {
            try {
                if (resp == null) {
                    throw new IOException("Error Publishing");
                }
                if (resp.isStatusMessage()) {
                    if (resp.getStatus().getCode() == 503) {
                        throw new IOException("Error Publishing: No stream available.");
                    }
                    throw new IOException("Error Publishing: " + resp.getStatus().getMessage());
                }
                return CompletableFuture.completedFuture(this.processAck((Message)resp, options));
            }
            catch (JetStreamApiException | IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private PublishAck publishSync(String subject, Headers headers, byte[] data, boolean utf8mode, PublishOptions options) throws IOException, JetStreamApiException {
        Duration timeout = options == null ? this.requestTimeout : options.getStreamTimeout();
        Headers merged = this.mergePublishOptions(headers, options);
        Message resp = this.makeRequestResponseRequired(subject, merged, data, utf8mode, timeout);
        return this.processAck(resp, options);
    }

    private PublishAck processAck(Message resp, PublishOptions options) throws IOException, JetStreamApiException {
        String pubStream;
        PublishAck ack = new PublishAck(resp);
        String ackStream = ack.getStream();
        String string = pubStream = options == null ? null : options.getStream();
        if (this.isStreamSpecified(pubStream) && !pubStream.equals(ackStream)) {
            throw new IOException("Expected ack from stream " + pubStream + ", received from: " + ackStream);
        }
        return ack;
    }

    private Headers mergePublishOptions(Headers headers, PublishOptions options) {
        Headers piHeaders;
        if (options == null) {
            piHeaders = headers == null ? null : new Headers(headers);
        } else {
            String s;
            piHeaders = new Headers(headers);
            long seqno = options.getExpectedLastSequence();
            if (seqno > 0L) {
                piHeaders.add("Nats-Expected-Last-Sequence", Long.toString(seqno));
            }
            if ((s = options.getExpectedLastMsgId()) != null) {
                piHeaders.add("Nats-Expected-Last-Msg-Id", s);
            }
            if ((s = options.getExpectedStream()) != null) {
                piHeaders.add("Nats-Expected-Stream", s);
            }
            if ((s = options.getMessageId()) != null) {
                piHeaders.add("Nats-Msg-Id", s);
            }
        }
        return piHeaders;
    }

    private boolean isStreamSpecified(String streamName) {
        return streamName != null;
    }

    NatsJetStreamSubscription createSubscription(String subject, String queueName, NatsDispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions pushSubscribeOptions, PullSubscribeOptions pullSubscribeOptions) throws IOException, JetStreamApiException {
        NatsJetStreamSubscription sub;
        ConsumerInfo consumerInfo;
        ConsumerConfiguration.Builder ccBuilder;
        String stream;
        SubscribeOptions so;
        boolean isPullMode;
        boolean bl = isPullMode = pullSubscribeOptions != null;
        if (isPullMode) {
            so = pullSubscribeOptions;
            stream = pullSubscribeOptions.getStream();
            ccBuilder = ConsumerConfiguration.builder(pullSubscribeOptions.getConsumerConfiguration());
            ccBuilder.deliverSubject(null);
        } else {
            so = pushSubscribeOptions == null ? PushSubscribeOptions.builder().build() : pushSubscribeOptions;
            stream = so.getStream();
            ccBuilder = ConsumerConfiguration.builder(so.getConsumerConfiguration());
        }
        String durable = ccBuilder.getDurable();
        String inbox = ccBuilder.getDeliverSubject();
        boolean createConsumer = true;
        if (stream == null) {
            stream = this.lookupStreamBySubject(subject);
        }
        if (durable != null && (consumerInfo = this.lookupConsumerInfo(stream, durable)) != null) {
            createConsumer = false;
            ConsumerConfiguration cc = consumerInfo.getConsumerConfiguration();
            String filterSub = cc.getFilterSubject();
            if (filterSub != null && !filterSub.equals(subject)) {
                throw new IllegalArgumentException(String.format("Subject %s mismatches consumer configuration %s.", subject, filterSub));
            }
            inbox = cc.getDeliverSubject();
        }
        if (inbox == null) {
            inbox = this.conn.createInbox();
        }
        if (dispatcher == null) {
            sub = (NatsJetStreamSubscription)this.conn.createSubscription(inbox, queueName, null, true);
        } else {
            MessageHandler mh = autoAck ? new AutoAckMessageHandler(handler) : handler;
            sub = (NatsJetStreamSubscription)dispatcher.subscribeImpl(inbox, queueName, mh, true);
        }
        if (createConsumer) {
            ConsumerInfo ci;
            if (ccBuilder.getMaxAckPending() == 0L && ccBuilder.getAckPolicy() != AckPolicy.None) {
                ccBuilder.maxAckPending(sub.getPendingMessageLimit());
            }
            if (!isPullMode) {
                ccBuilder.deliverSubject(inbox);
            }
            ccBuilder.filterSubject(subject);
            try {
                ci = this.addOrUpdateConsumerInternal(stream, ccBuilder.build());
            }
            catch (JetStreamApiException e) {
                if (dispatcher == null) {
                    sub.unsubscribe();
                } else {
                    dispatcher.unsubscribe(sub);
                }
                throw e;
            }
            sub.setupJetStream(this, ci.getName(), ci.getStreamName(), inbox, so);
        } else {
            sub.setupJetStream(this, durable, stream, inbox, so);
        }
        return sub;
    }

    @Override
    public JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException {
        Validator.validateJsSubscribeSubjectRequired(subject);
        return this.createSubscription(subject, null, null, null, false, null, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateJsSubscribeSubjectRequired(subject);
        return this.createSubscription(subject, null, null, null, false, options, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateJsSubscribeSubjectRequired(subject);
        Validator.validateQueueNameRequired(queue);
        return this.createSubscription(subject, queue, null, null, false, options, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException {
        Validator.validateJsSubscribeSubjectRequired(subject);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(handler, "Handler");
        return this.createSubscription(subject, null, (NatsDispatcher)dispatcher, handler, autoAck, null, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateJsSubscribeSubjectRequired(subject);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(handler, "Handler");
        return this.createSubscription(subject, null, (NatsDispatcher)dispatcher, handler, autoAck, options, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateJsSubscribeSubjectRequired(subject);
        Validator.validateQueueNameRequired(queue);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(handler, "Handler");
        return this.createSubscription(subject, queue, (NatsDispatcher)dispatcher, handler, autoAck, options, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, PullSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateJsSubscribeSubjectRequired(subject);
        Validator.validateNotNull(options, "Options");
        Validator.validateNotNull(options.getDurable(), "Durable");
        return this.createSubscription(subject, null, null, null, false, null, options);
    }

    ConsumerInfo lookupConsumerInfo(String stream, String consumer) throws IOException, JetStreamApiException {
        try {
            return this.getConsumerInfo(stream, consumer);
        }
        catch (JetStreamApiException e) {
            if (e.getErrorCode() == 404L && e.getErrorDescription().contains("consumer")) {
                return null;
            }
            throw e;
        }
    }

    private String lookupStreamBySubject(String subject) throws IOException, JetStreamApiException {
        String streamRequest = String.format("{\"subject\":\"%s\"}", subject);
        StreamNamesReader snr = new StreamNamesReader();
        Message resp = this.makeRequestResponseRequired("STREAM.NAMES", streamRequest.getBytes(), this.requestTimeout);
        snr.process(resp);
        if (snr.getStrings().size() != 1) {
            throw new IllegalStateException("No matching streams for subject: " + subject);
        }
        return snr.getStrings().get(0);
    }

    private Message makeRequest(String subject, byte[] bytes, Duration timeout) throws InterruptedException {
        return this.conn.request(this.prependPrefix(subject), bytes, timeout);
    }

    private Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeout) throws IOException {
        try {
            return this.responseRequired(this.conn.request(this.prependPrefix(subject), bytes, timeout));
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    private Message makeRequestResponseRequired(String subject, Headers headers, byte[] data, boolean utf8mode, Duration timeout) throws IOException {
        try {
            return this.responseRequired(this.conn.request(subject, headers, data, utf8mode, timeout));
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    private Message responseRequired(Message respMessage) throws IOException {
        if (respMessage == null) {
            throw new IOException("Timeout or no response waiting for NATS JetStream server");
        }
        return respMessage;
    }

    String prependPrefix(String subject) {
        return this.prefix + subject;
    }

    Duration getRequestTimeout() {
        return this.requestTimeout;
    }

    private static class AutoAckMessageHandler
    implements MessageHandler {
        MessageHandler userMH;

        AutoAckMessageHandler(MessageHandler userMH) {
            this.userMH = userMH;
        }

        @Override
        public void onMessage(Message msg) throws InterruptedException {
            try {
                this.userMH.onMessage(msg);
                msg.ack();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }
}

