/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.jms;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.jms.JmsConnectorCommonConfiguration;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.jms.JmsConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.jms.JmsResourceHolder;
import io.smallrye.reactive.messaging.jms.JmsSink;
import io.smallrye.reactive.messaging.jms.JmsSource;
import io.smallrye.reactive.messaging.jms.i18n.JmsExceptions;
import io.smallrye.reactive.messaging.jms.i18n.JmsLogging;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;
import jakarta.inject.Inject;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSProducer;
import java.lang.annotation.Annotation;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(value="smallrye-jms")
@ConnectorAttributes(value={@ConnectorAttribute(name="connection-factory-name", description="The name of the JMS connection factory  (`jakarta.jms.ConnectionFactory`) to be used. If not set, it uses any exposed JMS connection factory", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="username", description="The username to connect to to the JMS server", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="password", description="The password to connect to to the JMS server", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="session-mode", description="The session mode. Accepted values are AUTO_ACKNOWLEDGE, SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String", defaultValue="AUTO_ACKNOWLEDGE"), @ConnectorAttribute(name="client-id", description="The client id", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="destination", description="The name of the JMS destination. If not set the name of the channel is used", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="selector", description="The JMS selector", direction=ConnectorAttribute.Direction.INCOMING, type="String"), @ConnectorAttribute(name="no-local", description="Enable or disable local delivery", direction=ConnectorAttribute.Direction.INCOMING, type="boolean", defaultValue="false"), @ConnectorAttribute(name="broadcast", description="Whether or not the JMS message should be dispatched to multiple consumers", direction=ConnectorAttribute.Direction.INCOMING, type="boolean", defaultValue="false"), @ConnectorAttribute(name="durable", description="Set to `true` to use a durable subscription", direction=ConnectorAttribute.Direction.INCOMING, type="boolean", defaultValue="false"), @ConnectorAttribute(name="destination-type", description="The type of destination. It can be either `queue` or `topic`", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string", defaultValue="queue"), @ConnectorAttribute(name="tracing-enabled", type="boolean", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether tracing is enabled (default) or disabled", defaultValue="true"), @ConnectorAttribute(name="disable-message-id", description="Omit the message id in the outbound JMS message", direction=ConnectorAttribute.Direction.OUTGOING, type="boolean"), @ConnectorAttribute(name="disable-message-timestamp", description="Omit the message timestamp in the outbound JMS message", direction=ConnectorAttribute.Direction.OUTGOING, type="boolean"), @ConnectorAttribute(name="delivery-mode", description="The delivery mode. Either `persistent` or `non_persistent`", direction=ConnectorAttribute.Direction.OUTGOING, type="string"), @ConnectorAttribute(name="delivery-delay", description="The delivery delay", direction=ConnectorAttribute.Direction.OUTGOING, type="long"), @ConnectorAttribute(name="ttl", description="The JMS Message time-to-live", direction=ConnectorAttribute.Direction.OUTGOING, type="long"), @ConnectorAttribute(name="correlation-id", description="The JMS Message correlation id", direction=ConnectorAttribute.Direction.OUTGOING, type="string"), @ConnectorAttribute(name="priority", description="The JMS Message priority", direction=ConnectorAttribute.Direction.OUTGOING, type="int"), @ConnectorAttribute(name="reply-to", description="The reply to destination if any", direction=ConnectorAttribute.Direction.OUTGOING, type="string"), @ConnectorAttribute(name="reply-to-destination-type", description="The type of destination for the response. It can be either `queue` or `topic`", direction=ConnectorAttribute.Direction.OUTGOING, type="string", defaultValue="queue"), @ConnectorAttribute(name="merge", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether the connector should allow multiple upstreams", type="boolean", defaultValue="false"), @ConnectorAttribute(name="retry", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether to retry on terminal stream errors.", type="boolean", defaultValue="true"), @ConnectorAttribute(name="retry.max-retries", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Maximum number of retries for terminal stream errors.", type="int", defaultValue="3"), @ConnectorAttribute(name="retry.initial-delay", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The initial delay for the retry.", type="string", defaultValue="PT1S"), @ConnectorAttribute(name="retry.max-delay", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The maximum delay", type="string", defaultValue="PT10S"), @ConnectorAttribute(name="retry.jitter", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="How much the delay jitters as a multiplier between 0 and 1. The formula is current delay * jitter. For example, with a current delay of 2H, a jitter of 0.5 will result in an actual delay somewhere between 1H and 3H.", type="double", defaultValue="0.5")})
public class JmsConnector
implements InboundConnector,
OutboundConnector {
    public static final String CONNECTOR_NAME = "smallrye-jms";
    static final String DEFAULT_MAX_POOL_SIZE = "10";
    static final String DEFAULT_THREAD_TTL = "60";
    @Inject
    @Any
    Instance<ConnectionFactory> factories;
    @Inject
    Instance<JsonMapping> jsonMapper;
    @Inject
    ExecutionHolder executionHolders;
    @Inject
    @ConfigProperty(name="smallrye.jms.threads.max-pool-size", defaultValue="10")
    int maxPoolSize;
    @Inject
    @ConfigProperty(name="smallrye.jms.threads.ttl", defaultValue="60")
    int ttl;
    @Inject
    Instance<OpenTelemetry> openTelemetryInstance;
    private ExecutorService executor;
    private JsonMapping jsonMapping;
    private final List<JmsSource> sources = new CopyOnWriteArrayList<JmsSource>();
    private final List<JmsResourceHolder<?>> contexts = new CopyOnWriteArrayList();

    @PostConstruct
    public void init() {
        this.executor = Executors.newFixedThreadPool(this.maxPoolSize);
        if (this.jsonMapper.isUnsatisfied()) {
            JmsLogging.log.warn("Please add one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages.");
        } else if (this.jsonMapper.isAmbiguous()) {
            JmsLogging.log.warn("Please select only one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages.");
            this.jsonMapping = (JsonMapping)this.jsonMapper.stream().findFirst().orElseThrow(() -> new RuntimeException("Unable to find JSON Mapper"));
        } else {
            this.jsonMapping = (JsonMapping)this.jsonMapper.get();
        }
    }

    @PreDestroy
    public void cleanup() {
        this.sources.forEach(JmsSource::close);
        this.contexts.forEach(JmsResourceHolder::close);
        this.executor.shutdown();
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
        JmsResourceHolder<JMSConsumer> holder = new JmsResourceHolder<JMSConsumer>(ic.getChannel(), () -> this.createJmsContext(ic));
        this.contexts.add(holder);
        JmsSource source = new JmsSource(this.executionHolders.vertx(), holder, ic, this.openTelemetryInstance, this.jsonMapping, this.executor);
        this.sources.add(source);
        return source.getSource();
    }

    private JMSContext createJmsContext(JmsConnectorCommonConfiguration config) {
        String factoryName = config.getConnectionFactoryName().orElse(null);
        ConnectionFactory factory = this.pickTheFactory(factoryName);
        JMSContext context = this.createContext(factory, config.getUsername().orElse(null), config.getPassword().orElse(null), config.getSessionMode());
        config.getClientId().ifPresent(arg_0 -> ((JMSContext)context).setClientID(arg_0));
        return context;
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        JmsConnectorOutgoingConfiguration oc = new JmsConnectorOutgoingConfiguration(config);
        JmsResourceHolder<JMSProducer> holder = new JmsResourceHolder<JMSProducer>(oc.getChannel(), () -> this.createJmsContext(oc));
        this.contexts.add(holder);
        return new JmsSink(holder, oc, this.openTelemetryInstance, this.jsonMapping, this.executor).getSink();
    }

    private ConnectionFactory pickTheFactory(String factoryName) {
        Iterator iterator;
        if (this.factories.isUnsatisfied()) {
            if (factoryName == null) {
                throw JmsExceptions.ex.illegalStateCannotFindFactory();
            }
            throw JmsExceptions.ex.illegalStateCannotFindNamedFactory(factoryName);
        }
        if (factoryName == null) {
            iterator = this.factories.iterator();
        } else {
            Instance matching = this.factories.select(new Annotation[]{Identifier.Literal.of((String)factoryName)});
            if (matching.isUnsatisfied() && !(matching = this.factories.select(new Annotation[]{NamedLiteral.of((String)factoryName)})).isUnsatisfied()) {
                ProviderLogging.log.deprecatedNamed();
            }
            iterator = matching.iterator();
        }
        if (!iterator.hasNext()) {
            if (factoryName == null) {
                throw JmsExceptions.ex.illegalStateCannotFindFactory();
            }
            throw JmsExceptions.ex.illegalStateCannotFindNamedFactory(factoryName);
        }
        return (ConnectionFactory)iterator.next();
    }

    private JMSContext createContext(ConnectionFactory factory, String username, String password, String mode) {
        int sessionMode = switch (mode.toUpperCase()) {
            case "AUTO_ACKNOWLEDGE" -> 1;
            case "SESSION_TRANSACTED" -> 0;
            case "CLIENT_ACKNOWLEDGE" -> 2;
            case "DUPS_OK_ACKNOWLEDGE" -> 3;
            default -> throw JmsExceptions.ex.illegalStateUnknowSessionMode(mode);
        };
        if (username != null) {
            return factory.createContext(username, password, sessionMode);
        }
        return factory.createContext(sessionMode);
    }
}

