/*
 * Decompiled with CFR 0.152.
 */
package io.nats.bridge.nats;

import io.nats.bridge.MessageBus;
import io.nats.bridge.TimeSource;
import io.nats.bridge.messages.MessageBuilder;
import io.nats.bridge.metrics.Counter;
import io.nats.bridge.metrics.Gauge;
import io.nats.bridge.metrics.Metrics;
import io.nats.bridge.metrics.MetricsProcessor;
import io.nats.bridge.metrics.TimeTracker;
import io.nats.bridge.nats.NatsMessageBusException;
import io.nats.bridge.util.ExceptionHandler;
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Statistics;
import io.nats.client.Subscription;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedTransferQueue;
import java.util.function.Consumer;

public class NatsMessageBus
implements MessageBus {
    final Duration NOW = Duration.ofMillis(1L);
    private final Connection connection;
    private final String subject;
    private final Subscription subscription;
    private final ExceptionHandler tryHandler;
    private final Queue<NatsReply> replyQueue;
    private final Queue<NatsReply> replyQueueNotDone;
    private final Queue<ReplyTo> replyToQueue;
    private final Metrics metrics;
    private final Counter countReceivedReply;
    private final Counter countReceivedReplyErrors;
    private final Counter countPublish;
    private final Counter countReceived;
    private final Counter countRequest;
    private final Counter countRequestResponses;
    private final TimeTracker timerRequestResponse;
    private final TimeTracker timerReceiveReply;
    private final Duration durationConnectionsMetrics;
    private final Gauge countInMsgs;
    private final Gauge countOutMsgs;
    private final Gauge countInBytes;
    private final Gauge countOutBytes;
    private final Gauge countReconnects;
    private final Gauge countDropped;
    private boolean stopped = false;
    private final MetricsProcessor metricsProcessor;
    private final TimeSource timeSource;
    private final String name;
    private long lastConnectionMetricsCollection;
    private int checkEvery = 0;

    public NatsMessageBus(String name, String subject, Connection connection, String queueGroup, ExceptionHandler tryHandler, Queue<NatsReply> replyQueue, Queue<NatsReply> replyQueueNotDone, TimeSource timeSource, Metrics metrics, MetricsProcessor metricsProcessor, Duration durationConnectionsMetrics) {
        this.connection = connection;
        this.durationConnectionsMetrics = durationConnectionsMetrics;
        this.subject = subject;
        this.subscription = connection.subscribe(subject, queueGroup);
        this.tryHandler = tryHandler;
        this.replyQueue = replyQueue;
        this.timeSource = timeSource;
        this.replyQueueNotDone = replyQueueNotDone;
        this.metrics = metrics;
        this.metricsProcessor = metricsProcessor;
        this.name = name.toLowerCase().replace(".", "_").replace(" ", "_").replace("-", "_");
        String[] tags = Metrics.tags("name", "name_" + this.name, "mb_type", "nats_mb", "subject", subject);
        this.countPublish = metrics.createCounter("publish_count", tags);
        this.countRequest = metrics.createCounter("request_count", tags);
        this.countRequestResponses = metrics.createCounter("request_response_count", tags);
        this.timerRequestResponse = metrics.createTimeTracker("request_response_timing", tags);
        this.countReceived = metrics.createCounter("received_count", tags);
        this.countReceivedReply = metrics.createCounter("received_reply_count", tags);
        this.timerReceiveReply = metrics.createTimeTracker("receive_reply_timing", tags);
        this.countReceivedReplyErrors = metrics.createCounter("received_reply_count_errors", tags);
        this.countInMsgs = metrics.createGauge("conn_get_in_msgs_count", new String[0]);
        this.countOutMsgs = metrics.createGauge("conn_get_out_msgs_count", new String[0]);
        this.countInBytes = metrics.createGauge("conn_get_in_bytes", new String[0]);
        this.countOutBytes = metrics.createGauge("conn_get_out_bytes", new String[0]);
        this.countReconnects = metrics.createGauge("conn_reconnects", new String[0]);
        this.countDropped = metrics.createGauge("conn_dropped_count", new String[0]);
        this.replyToQueue = new LinkedTransferQueue<ReplyTo>();
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public Metrics metrics() {
        return this.metrics;
    }

    @Override
    public void publish(io.nats.bridge.messages.Message message) {
        this.countPublish.increment();
        this.connection.publish(this.subject, message.getMessageBytes());
    }

    @Override
    public void publish(String message) {
        this.countPublish.increment();
        this.connection.publish(this.subject, message.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public void request(io.nats.bridge.messages.Message message, Consumer<io.nats.bridge.messages.Message> replyCallback) {
        this.countRequest.increment();
        CompletableFuture future = this.connection.request(this.subject, message.getMessageBytes());
        if (!this.replyQueue.add(new NatsReply(this.timeSource.getTime(), replyCallback, future))) {
            throw new NatsMessageBusException("Unable to add to reply queue");
        }
    }

    @Override
    public Optional<io.nats.bridge.messages.Message> receive(Duration duration) {
        if (this.stopped) {
            return Optional.empty();
        }
        return this.doReceive(duration);
    }

    @Override
    public Optional<io.nats.bridge.messages.Message> receive() {
        return this.doReceive(this.NOW);
    }

    private Optional<io.nats.bridge.messages.Message> doReceive(Duration duration) {
        return this.tryHandler.tryReturnOrRethrow(() -> {
            Message message = this.subscription.nextMessage(duration);
            if (message != null) {
                this.countReceived.increment();
                return this.convertMessage(message);
            }
            return Optional.empty();
        }, e -> {
            throw new NatsMessageBusException("unable to get next message from nats bus", (Throwable)e);
        });
    }

    private void replyUsingReplyTo(long startTime, String replyTo, io.nats.bridge.messages.Message reply) {
        this.replyToQueue.add(new ReplyTo(startTime, replyTo, reply));
    }

    private Optional<io.nats.bridge.messages.Message> convertMessage(Message message) {
        String replyTo = message.getReplyTo();
        if (replyTo != null) {
            long startTime = this.timeSource.getTime();
            return Optional.of(MessageBuilder.builder().withReplyHandler(reply -> this.replyUsingReplyTo(startTime, replyTo, (io.nats.bridge.messages.Message)reply)).withCreator(this.name).buildFromBytes(message.getData()));
        }
        io.nats.bridge.messages.Message bridgeMessage = MessageBuilder.builder().withNoReplyHandler("NATS MESSAGE BUS NO REPLY TO CONVERT MESSAGE NATS TO BRIDGE").withCreator(this.name).buildFromBytes(message.getData());
        return Optional.of(bridgeMessage);
    }

    @Override
    public void close() {
        this.tryHandler.tryWithLog(() -> {
            this.stopped = true;
            this.connection.drain(Duration.ofSeconds(30L)).get();
        }, "Can't drain and close nats connection " + this.subject);
    }

    @Override
    public int process() {
        ++this.checkEvery;
        if (this.checkEvery % 10 == 0 && this.timeSource.getTime() - this.lastConnectionMetricsCollection > this.durationConnectionsMetrics.toMillis()) {
            this.lastConnectionMetricsCollection = this.timeSource.getTime();
            Statistics statistics = this.connection.getStatistics();
            this.countDropped.recordLevel(statistics.getDroppedCount());
            this.countInBytes.recordLevel(statistics.getInBytes());
            this.countOutBytes.recordLevel(statistics.getOutBytes());
            this.countOutMsgs.recordLevel(statistics.getOutMsgs());
            this.countInMsgs.recordLevel(statistics.getInMsgs());
            this.countReconnects.recordLevel(statistics.getReconnects());
        }
        this.metricsProcessor.process();
        int count = this.processReplyToQueue();
        return count + this.processResponses();
    }

    private int processReplyToQueue() {
        int count = 0;
        ReplyTo reply = this.replyToQueue.poll();
        while (reply != null) {
            this.connection.publish(reply.replyTo, reply.reply.getMessageBytes());
            ++count;
            this.timerReceiveReply.recordTiming(this.timeSource.getTime() - reply.startTime);
            this.countReceivedReply.increment();
            reply = this.replyToQueue.poll();
        }
        return count;
    }

    private int processResponses() {
        int[] countHolder = new int[1];
        this.tryHandler.tryWithErrorCount(() -> {
            NatsReply reply = null;
            int count = 0;
            do {
                if ((reply = this.replyQueue.poll()) == null) continue;
                if (reply.future.isDone()) {
                    ++count;
                    Message replyMessage = (Message)reply.future.get();
                    reply.replyCallback.accept(MessageBuilder.builder().withCreator(this.name).withNoReplyHandler("NATS MESSAGE BUS PROCESS RESPONSE").buildFromBytes(replyMessage.getData()));
                    this.timerRequestResponse.recordTiming(this.timeSource.getTime() - reply.requestTime);
                    this.countRequestResponses.increment();
                    continue;
                }
                if (this.replyQueueNotDone.add(reply)) continue;
                throw new NatsMessageBusException("Unable to add to reply queue");
            } while (reply != null);
            do {
                if ((reply = this.replyQueueNotDone.poll()) == null || this.replyQueue.add(reply)) continue;
                throw new NatsMessageBusException("Unable to add to reply queue");
            } while (reply != null);
            this.replyQueueNotDone.clear();
            countHolder[0] = count;
        }, this.countReceivedReplyErrors, "error processing NATS receive queue for replies");
        return countHolder[0];
    }

    public static class NatsReply {
        private final long requestTime;
        private final Consumer<io.nats.bridge.messages.Message> replyCallback;
        private final CompletableFuture<Message> future;

        NatsReply(long requestTime, Consumer<io.nats.bridge.messages.Message> replyCallback, CompletableFuture<Message> future) {
            this.requestTime = requestTime;
            this.replyCallback = replyCallback;
            this.future = future;
        }
    }

    private static class ReplyTo {
        private final String replyTo;
        private final io.nats.bridge.messages.Message reply;
        private final long startTime;

        ReplyTo(long startTime, String replyTo, io.nats.bridge.messages.Message reply) {
            this.replyTo = replyTo;
            this.reply = reply;
            this.startTime = startTime;
        }
    }
}

