/*
 * Decompiled with CFR 0.152.
 */
package io.nats.bridge.jms;

import io.nats.bridge.MessageBus;
import io.nats.bridge.TimeSource;
import io.nats.bridge.jms.JMSMessageBusException;
import io.nats.bridge.jms.JmsContext;
import io.nats.bridge.jms.support.JMSReply;
import io.nats.bridge.jms.support.JMSRequestResponse;
import io.nats.bridge.messages.Message;
import io.nats.bridge.metrics.Counter;
import io.nats.bridge.metrics.Metrics;
import io.nats.bridge.metrics.MetricsProcessor;
import io.nats.bridge.metrics.TimeTracker;
import io.nats.bridge.util.ExceptionHandler;
import io.nats.bridge.util.FunctionWithException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.slf4j.Logger;

public class JMSMessageBus
implements MessageBus {
    private final String name;
    private final TimeSource timeSource;
    private final Map<String, JMSRequestResponse> requestResponseMap = new HashMap<String, JMSRequestResponse>();
    private final Metrics metrics;
    private final Counter countRequestResponseErrors;
    private final Counter countReceivedReply;
    private final Counter countReceivedReplyErrors;
    private final Counter countPublish;
    private final Counter countReceived;
    private final Counter countPublishErrors;
    private final Counter messageConvertErrors;
    private final Counter countRequest;
    private final Counter countRequestErrors;
    private final Counter countRequestResponses;
    private final Counter countRequestResponsesMissing;
    private final TimeTracker timerRequestResponse;
    private final TimeTracker timerReceiveReply;
    private final MetricsProcessor metricsProcessor;
    private final ExceptionHandler tryHandler;
    private final Logger logger;
    private final Queue<JMSReply> jmsReplyQueue;
    private final FunctionWithException<javax.jms.Message, Message> jmsMessageConverter;
    private JmsContext jms;
    private final FunctionWithException<Message, javax.jms.Message> bridgeMessageConverter;
    private final Supplier<JmsContext> jmsSupplier;

    public JMSMessageBus(String name, TimeSource timeSource, Metrics metrics, MetricsProcessor metricsProcessor, ExceptionHandler tryHandler, Logger logger, Queue<JMSReply> jmsReplyQueue, FunctionWithException<javax.jms.Message, Message> jmsMessageConverter, FunctionWithException<Message, javax.jms.Message> bridgeMessageConverter, String destinationName, Supplier<JmsContext> jmsSupplier) {
        this.name = name.toLowerCase().replace(".", "_").replace(" ", "_").replace("-", "_");
        this.timeSource = timeSource;
        this.tryHandler = tryHandler;
        this.metrics = metrics;
        this.jmsSupplier = jmsSupplier;
        String[] tags = Metrics.tags("name", "name_" + this.name, "mb_type", "jms_mb", "dst", destinationName);
        this.countPublish = metrics.createCounter("publish_count", tags);
        this.countPublishErrors = metrics.createCounter("publish_count_errors", tags);
        this.countRequest = metrics.createCounter("request_count", tags);
        this.countRequestErrors = metrics.createCounter("request_count_errors", tags);
        this.countRequestResponses = metrics.createCounter("request_response_count", tags);
        this.countRequestResponseErrors = metrics.createCounter("request_response_count_errors", tags);
        this.countRequestResponsesMissing = metrics.createCounter("request_response_missing_count", tags);
        this.timerRequestResponse = metrics.createTimeTracker("request_response_timing", tags);
        this.countReceived = metrics.createCounter("received_count", tags);
        this.countReceivedReply = metrics.createCounter("received_reply_count", tags);
        this.timerReceiveReply = metrics.createTimeTracker("receive_reply_timing", tags);
        this.countReceivedReplyErrors = metrics.createCounter("received_reply_count_errors", tags);
        this.messageConvertErrors = metrics.createCounter("message_convert_count_errors", tags);
        this.metricsProcessor = metricsProcessor;
        this.logger = logger;
        this.jmsReplyQueue = jmsReplyQueue;
        this.jmsMessageConverter = jmsMessageConverter;
        this.bridgeMessageConverter = bridgeMessageConverter;
    }

    private JmsContext jms() {
        if (this.jms == null) {
            this.jms = this.jmsSupplier.get();
        }
        return this.jms;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public Metrics metrics() {
        return this.metrics;
    }

    @Override
    public void publish(Message message) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("publish called " + message);
        }
        this.tryHandler.tryWithErrorCount(() -> {
            javax.jms.Message jmsMessage = this.convertToJMSMessage(message);
            this.jms().producer().send(jmsMessage);
            this.countPublish.increment();
        }, this.countPublishErrors, "Unable to send the message to the producer");
    }

    @Override
    public void request(Message message, Consumer<Message> replyCallback) {
        if (this.jms().getResponseDestination() == null) {
            throw new IllegalStateException("Response destination is not set so request/reply is not possible");
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("request called " + message);
        }
        javax.jms.Message jmsMessage = this.convertToJMSMessage(message);
        this.tryHandler.tryWithRethrow(() -> {
            String correlationID = jmsMessage.getJMSCorrelationID() == null ? UUID.randomUUID().toString() : jmsMessage.getJMSCorrelationID();
            jmsMessage.setJMSReplyTo(this.jms().getResponseDestination());
            if (jmsMessage.getJMSCorrelationID() == null) {
                jmsMessage.setJMSCorrelationID(correlationID);
            }
            this.jms().producer().send(jmsMessage);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("REQUEST BODY " + message.toString());
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(String.format("CORRELATION ID: %s %s\n", correlationID, this.jms().getResponseDestination()));
            }
            this.requestResponseMap.put(correlationID, new JMSRequestResponse(replyCallback, this.timeSource.getTime()));
            this.countRequest.increment();
        }, this.countRequestErrors, e -> new JMSMessageBusException("unable to send JMS request", (Throwable)e));
    }

    private javax.jms.Message convertToJMSMessage(Message message) {
        return this.tryHandler.tryFunctionOrRethrow(message, m -> this.bridgeMessageConverter.apply(message), e -> new JMSMessageBusException("Unable to create JMS message", (Throwable)e));
    }

    private Message convertToBusMessage(javax.jms.Message jmsMessage) {
        return this.tryHandler.tryFunctionOrRethrow(jmsMessage, m -> this.jmsMessageConverter.apply(jmsMessage), e -> {
            this.messageConvertErrors.increment();
            return new JMSMessageBusException("Unable to convert JMS message to Bridge Message", (Throwable)e);
        });
    }

    @Override
    public Optional<Message> receive() {
        return this.tryHandler.tryReturnOrRethrow(() -> {
            javax.jms.Message message = this.jms().consumer().receiveNoWait();
            if (message != null) {
                this.countReceived.increment();
                return Optional.of(this.convertToBusMessage(message));
            }
            return Optional.empty();
        }, e -> {
            throw new JMSMessageBusException("Error receiving message", (Throwable)e);
        });
    }

    @Override
    public Optional<Message> receive(Duration duration) {
        return this.tryHandler.tryReturnOrRethrow(() -> {
            javax.jms.Message message = this.jms().consumer().receive(duration.toMillis());
            if (message != null) {
                this.countReceived.increment();
                return Optional.of(this.convertToBusMessage(message));
            }
            return Optional.empty();
        }, e -> {
            throw new JMSMessageBusException("Error receiving message", (Throwable)e);
        });
    }

    @Override
    public void init() {
        this.jms();
    }

    @Override
    public void close() {
        try {
            if (this.jms != null) {
                this.jms.close();
            }
        }
        catch (Exception ex) {
            this.logger.warn("Unable to close JMS Context", (Throwable)ex);
        }
        this.jms = null;
        this.jmsReplyQueue.clear();
    }

    private int processResponses() {
        JmsContext jms = this.jms();
        if (jms == null) {
            return 0;
        }
        MessageConsumer responseConsumer = jms.getResponseConsumer();
        if (responseConsumer == null) {
            return 0;
        }
        int[] countHolder = new int[1];
        this.tryHandler.tryWithRethrow(() -> {
            javax.jms.Message message;
            int count = 0;
            do {
                if ((message = responseConsumer.receiveNoWait()) == null) continue;
                ++count;
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(this.name + "::: RESPONSE FROM JMS  ");
                }
                String correlationID = message.getJMSCorrelationID();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(String.format("%s ::: Process JMS Message Consumer %s \n", this.name, correlationID));
                }
                Optional<JMSRequestResponse> jmsRequestResponse = Optional.ofNullable(this.requestResponseMap.get(correlationID));
                javax.jms.Message msg = message;
                jmsRequestResponse.ifPresent(requestResponse -> {
                    requestResponse.getReplyCallback().accept(this.convertToBusMessage(msg));
                    this.timerRequestResponse.recordTiming(this.timeSource.getTime() - requestResponse.getSentTime());
                    this.countRequestResponses.increment();
                });
                if (jmsRequestResponse.isPresent()) continue;
                this.countRequestResponsesMissing.increment();
            } while (message != null);
            countHolder[0] = count;
        }, this.countRequestResponseErrors, e -> new JMSMessageBusException("unable to call JMS receiveNoWait", (Throwable)e));
        return countHolder[0];
    }

    @Override
    public int process() {
        this.metricsProcessor.process();
        int count = this.processReplies();
        return count + this.processResponses();
    }

    private int processReplies() {
        int[] countHolder = new int[1];
        this.tryHandler.tryWithRethrow(() -> {
            JMSReply reply;
            Session session = this.jms().getSession();
            int count = 0;
            do {
                MessageProducer replyProducer;
                if ((reply = this.jmsReplyQueue.poll()) == null) continue;
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(this.name + "::: REPLY FROM SERVER IN JMS MESSAGE BUS " + reply.getReply().bodyAsString());
                }
                ++count;
                byte[] messageBody = reply.getReply().getBodyBytes();
                String correlationId = reply.getCorrelationID();
                Destination jmsReplyTo = reply.getJmsReplyTo();
                try {
                    replyProducer = session.createProducer(jmsReplyTo);
                }
                catch (Exception ex) {
                    this.logger.debug("Unable to handle creating a producer", (Throwable)ex);
                    return;
                }
                BytesMessage jmsReplyMessage = session.createBytesMessage();
                jmsReplyMessage.writeBytes(messageBody);
                this.timerReceiveReply.recordTiming(this.timeSource.getTime() - reply.getSentTime());
                this.countReceivedReply.increment();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(String.format("%s ::: Reply handler - %s %s %s\n", this.name, reply.getReply().bodyAsString(), correlationId, replyProducer.getDestination().toString()));
                }
                jmsReplyMessage.setJMSCorrelationID(correlationId);
                replyProducer.send((javax.jms.Message)jmsReplyMessage);
                replyProducer.close();
            } while (reply != null);
            countHolder[0] = count;
        }, this.countReceivedReplyErrors, e -> new JMSMessageBusException("unable to process JMS Replies", (Throwable)e));
        return countHolder[0];
    }
}

