/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamOptions;
import io.nats.client.Message;
import io.nats.client.NUID;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerCreateRequest;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsJetStreamSubscription;
import io.nats.client.impl.StreamInfoReader;
import io.nats.client.impl.StreamNamesReader;
import io.nats.client.support.NatsJetStreamClientError;
import io.nats.client.support.NatsJetStreamConstants;
import io.nats.client.support.NatsRequestCompletableFuture;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

class NatsJetStreamImpl
implements NatsJetStreamConstants {
    private static final ConcurrentHashMap<String, CachedStreamInfo> CACHED_STREAM_INFO_MAP = new ConcurrentHashMap();
    final NatsConnection conn;
    final JetStreamOptions jso;
    final boolean consumerCreate290Available;

    NatsJetStreamImpl(NatsConnection connection, JetStreamOptions jsOptions) throws IOException {
        this.conn = connection;
        Duration rt = jsOptions == null || jsOptions.getRequestTimeout() == null ? this.conn.getOptions().getConnectionTimeout() : jsOptions.getRequestTimeout();
        this.jso = JetStreamOptions.builder(jsOptions).requestTimeout(rt).build();
        this.consumerCreate290Available = this.conn.getInfo().isSameOrNewerThanVersion("2.9.0") && !this.jso.isOptOut290ConsumerCreate();
    }

    ConsumerInfo _getConsumerInfo(String streamName, String consumerName) throws IOException, JetStreamApiException {
        String subj = String.format("CONSUMER.INFO.%s.%s", streamName, consumerName);
        Message resp = this.makeRequestResponseRequired(subj, null, this.jso.getRequestTimeout());
        return (ConsumerInfo)new ConsumerInfo(resp).throwOnHasError();
    }

    ConsumerInfo _createConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
        String subj;
        String consumerName = config.getName();
        if (consumerName != null && !this.consumerCreate290Available) {
            throw NatsJetStreamClientError.JsConsumerCreate290NotAvailable.instance();
        }
        String durable = config.getDurable();
        if (this.consumerCreate290Available) {
            String fs;
            if (consumerName == null) {
                String string = consumerName = durable == null ? this.generateConsumerName() : durable;
            }
            subj = (fs = config.getFilterSubject()) == null || fs.equals(">") ? String.format("CONSUMER.CREATE.%s.%s", streamName, consumerName) : String.format("CONSUMER.CREATE.%s.%s.%s", streamName, consumerName, fs);
        } else {
            subj = durable == null ? String.format("CONSUMER.CREATE.%s", streamName) : String.format("CONSUMER.DURABLE.CREATE.%s.%s", streamName, durable);
        }
        ConsumerCreateRequest ccr = new ConsumerCreateRequest(streamName, config);
        Message resp = this.makeRequestResponseRequired(subj, ccr.serialize(), this.jso.getRequestTimeout());
        return (ConsumerInfo)new ConsumerInfo(resp).throwOnHasError();
    }

    void _createConsumerUnsubscribeOnException(String stream, ConsumerConfiguration cc, NatsJetStreamSubscription sub) throws IOException, JetStreamApiException {
        try {
            ConsumerInfo ci = this._createConsumer(stream, cc);
            sub.setConsumerName(ci.getName());
        }
        catch (JetStreamApiException | IOException e) {
            if (sub.getDispatcher() == null) {
                sub.unsubscribe();
            } else {
                sub.getDispatcher().unsubscribe(sub);
            }
            throw e;
        }
    }

    StreamInfo _getStreamInfo(String streamName, StreamInfoOptions options) throws IOException, JetStreamApiException {
        String subj = String.format("STREAM.INFO.%s", streamName);
        StreamInfoReader sir = new StreamInfoReader();
        while (sir.hasMore()) {
            Message resp = this.makeRequestResponseRequired(subj, sir.nextJson(options), this.jso.getRequestTimeout());
            sir.process(resp);
        }
        return this.cacheStreamInfo(streamName, sir.getStreamInfo());
    }

    StreamInfo createAndCacheStreamInfoThrowOnError(String streamName, Message resp) throws JetStreamApiException {
        return this.cacheStreamInfo(streamName, (StreamInfo)new StreamInfo(resp).throwOnHasError());
    }

    StreamInfo cacheStreamInfo(String streamName, StreamInfo si) {
        CACHED_STREAM_INFO_MAP.put(streamName, new CachedStreamInfo(si));
        return si;
    }

    List<StreamInfo> cacheStreamInfo(List<StreamInfo> list) {
        list.forEach(si -> CACHED_STREAM_INFO_MAP.put(si.getConfiguration().getName(), new CachedStreamInfo((StreamInfo)si)));
        return list;
    }

    List<String> _getStreamNames(String subjectFilter) throws IOException, JetStreamApiException {
        StreamNamesReader snr = new StreamNamesReader();
        while (snr.hasMore()) {
            Message resp = this.makeRequestResponseRequired("STREAM.NAMES", snr.nextJson(subjectFilter), this.jso.getRequestTimeout());
            snr.process(resp);
        }
        return snr.getStrings();
    }

    String generateConsumerName() {
        return NUID.nextGlobalSequence();
    }

    ConsumerConfiguration nextOrderedConsumerConfiguration(ConsumerConfiguration originalCc, long lastStreamSeq, String newDeliverSubject) {
        return ConsumerConfiguration.builder(originalCc).deliverPolicy(DeliverPolicy.ByStartSequence).deliverSubject(newDeliverSubject).startSequence(Math.max(1L, lastStreamSeq + 1L)).startTime(null).build();
    }

    ConsumerInfo lookupConsumerInfo(String streamName, String consumerName) throws IOException, JetStreamApiException {
        try {
            return this._getConsumerInfo(streamName, consumerName);
        }
        catch (JetStreamApiException e) {
            if (e.getApiErrorCode() == 10014 || e.getErrorCode() == 404 && e.getErrorDescription().contains("consumer")) {
                return null;
            }
            throw e;
        }
    }

    String lookupStreamBySubject(String subject) throws IOException, JetStreamApiException {
        List<String> list = this._getStreamNames(subject);
        return list.size() == 1 ? list.get(0) : null;
    }

    Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeout) throws IOException {
        try {
            return this.responseRequired(this.conn.request(this.prependPrefix(subject), bytes, timeout));
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, NatsRequestCompletableFuture.CancelAction cancelAction) throws IOException {
        try {
            return this.responseRequired(this.conn.requestInternal(subject, headers, data, timeout, cancelAction));
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    Message responseRequired(Message respMessage) throws IOException {
        if (respMessage == null) {
            throw new IOException("Timeout or no response waiting for NATS JetStream server");
        }
        return respMessage;
    }

    String prependPrefix(String subject) {
        return this.jso.getPrefix() + subject;
    }

    CachedStreamInfo getCachedStreamInfo(String streamName) throws IOException, JetStreamApiException {
        CachedStreamInfo csi = CACHED_STREAM_INFO_MAP.get(streamName);
        if (csi != null) {
            return csi;
        }
        this._getStreamInfo(streamName, null);
        return CACHED_STREAM_INFO_MAP.get(streamName);
    }

    static class CachedStreamInfo {
        public final boolean allowDirect;

        public CachedStreamInfo(StreamInfo si) {
            this.allowDirect = si.getConfiguration().getAllowDirect();
        }
    }
}

