/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.jms;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.jms.JmsConnectorCommonConfiguration;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.jms.JmsConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.jms.JmsSink;
import io.smallrye.reactive.messaging.jms.JmsSource;
import io.smallrye.reactive.messaging.jms.i18n.JmsExceptions;
import io.smallrye.reactive.messaging.jms.i18n.JmsLogging;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import java.lang.annotation.Annotation;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

@ApplicationScoped
@Connector(value="smallrye-jms")
@ConnectorAttributes(value={@ConnectorAttribute(name="connection-factory-name", description="The name of the JMS connection factory  (`javax.jms.ConnectionFactory`) to be used. If not set, it uses any exposed JMS connection factory", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="username", description="The username to connect to to the JMS server", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="password", description="The password to connect to to the JMS server", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="session-mode", description="The session mode. Accepted values are AUTO_ACKNOWLEDGE, SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String", defaultValue="AUTO_ACKNOWLEDGE"), @ConnectorAttribute(name="client-id", description="The client id", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="destination", description="The name of the JMS destination. If not set the name of the channel is used", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="selector", description="The JMS selector", direction=ConnectorAttribute.Direction.INCOMING, type="String"), @ConnectorAttribute(name="no-local", description="Enable or disable local delivery", direction=ConnectorAttribute.Direction.INCOMING, type="boolean", defaultValue="false"), @ConnectorAttribute(name="broadcast", description="Whether or not the JMS message should be dispatched to multiple consumers", direction=ConnectorAttribute.Direction.INCOMING, type="boolean", defaultValue="false"), @ConnectorAttribute(name="durable", description="Set to `true` to use a durable subscription", direction=ConnectorAttribute.Direction.INCOMING, type="boolean", defaultValue="false"), @ConnectorAttribute(name="destination-type", description="The type of destination. It can be either `queue` or `topic`", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string", defaultValue="queue"), @ConnectorAttribute(name="disable-message-id", description="Omit the message id in the outbound JMS message", direction=ConnectorAttribute.Direction.OUTGOING, type="boolean"), @ConnectorAttribute(name="disable-message-timestamp", description="Omit the message timestamp in the outbound JMS message", direction=ConnectorAttribute.Direction.OUTGOING, type="boolean"), @ConnectorAttribute(name="delivery-mode", description="The delivery mode. Either `persistent` or `non_persistent`", direction=ConnectorAttribute.Direction.OUTGOING, type="string"), @ConnectorAttribute(name="delivery-delay", description="The delivery delay", direction=ConnectorAttribute.Direction.OUTGOING, type="long"), @ConnectorAttribute(name="ttl", description="The JMS Message time-to-live", direction=ConnectorAttribute.Direction.OUTGOING, type="long"), @ConnectorAttribute(name="correlation-id", description="The JMS Message correlation id", direction=ConnectorAttribute.Direction.OUTGOING, type="string"), @ConnectorAttribute(name="priority", description="The JMS Message priority", direction=ConnectorAttribute.Direction.OUTGOING, type="int"), @ConnectorAttribute(name="reply-to", description="The reply to destination if any", direction=ConnectorAttribute.Direction.OUTGOING, type="string"), @ConnectorAttribute(name="reply-to-destination-type", description="The type of destination for the response. It can be either `queue` or `topic`", direction=ConnectorAttribute.Direction.OUTGOING, type="string", defaultValue="queue"), @ConnectorAttribute(name="merge", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether the connector should allow multiple upstreams", type="boolean", defaultValue="false")})
public class JmsConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory {
    public static final String CONNECTOR_NAME = "smallrye-jms";
    static final String DEFAULT_MAX_POOL_SIZE = "10";
    static final String DEFAULT_THREAD_TTL = "60";
    @Inject
    @Any
    Instance<ConnectionFactory> factories;
    @Inject
    Instance<JsonMapping> jsonMapper;
    @Inject
    @ConfigProperty(name="smallrye.jms.threads.max-pool-size", defaultValue="10")
    int maxPoolSize;
    @Inject
    @ConfigProperty(name="smallrye.jms.threads.ttl", defaultValue="60")
    int ttl;
    private ExecutorService executor;
    private JsonMapping jsonMapping;
    private final List<JmsSource> sources = new CopyOnWriteArrayList<JmsSource>();
    private final List<JMSContext> contexts = new CopyOnWriteArrayList<JMSContext>();

    @PostConstruct
    public void init() {
        this.executor = new ThreadPoolExecutor(0, this.maxPoolSize, this.ttl, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        if (this.jsonMapper.isUnsatisfied()) {
            JmsLogging.log.warn("Please add one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages.");
        } else if (this.jsonMapper.isAmbiguous()) {
            JmsLogging.log.warn("Please select only one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages.");
            this.jsonMapping = (JsonMapping)this.jsonMapper.stream().findFirst().orElseThrow(() -> new RuntimeException("Unable to find JSON Mapper"));
        } else {
            this.jsonMapping = (JsonMapping)this.jsonMapper.get();
        }
    }

    @PreDestroy
    public void cleanup() {
        this.sources.forEach(JmsSource::close);
        this.contexts.forEach(JMSContext::close);
        this.executor.shutdown();
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
        JMSContext context = this.createJmsContext(ic);
        this.contexts.add(context);
        JmsSource source = new JmsSource(context, ic, this.jsonMapping, this.executor);
        this.sources.add(source);
        return source.getSource();
    }

    private JMSContext createJmsContext(JmsConnectorCommonConfiguration config) {
        String factoryName = config.getConnectionFactoryName().orElse(null);
        ConnectionFactory factory = this.pickTheFactory(factoryName);
        JMSContext context = this.createContext(factory, config.getUsername().orElse(null), config.getPassword().orElse(null), config.getSessionMode());
        config.getClientId().ifPresent(arg_0 -> ((JMSContext)context).setClientID(arg_0));
        return context;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        JmsConnectorOutgoingConfiguration oc = new JmsConnectorOutgoingConfiguration(config);
        JMSContext context = this.createJmsContext(oc);
        this.contexts.add(context);
        return new JmsSink(context, oc, this.jsonMapping, this.executor).getSink();
    }

    private ConnectionFactory pickTheFactory(String factoryName) {
        Iterator iterator;
        if (this.factories.isUnsatisfied()) {
            if (factoryName == null) {
                throw JmsExceptions.ex.illegalStateCannotFindFactory();
            }
            throw JmsExceptions.ex.illegalStateCannotFindNamedFactory(factoryName);
        }
        if (factoryName == null) {
            iterator = this.factories.iterator();
        } else {
            Instance matching = this.factories.select(new Annotation[]{Identifier.Literal.of((String)factoryName)});
            if (matching.isUnsatisfied() && !(matching = this.factories.select(new Annotation[]{NamedLiteral.of((String)factoryName)})).isUnsatisfied()) {
                ProviderLogging.log.deprecatedNamed();
            }
            iterator = matching.iterator();
        }
        if (!iterator.hasNext()) {
            if (factoryName == null) {
                throw JmsExceptions.ex.illegalStateCannotFindFactory();
            }
            throw JmsExceptions.ex.illegalStateCannotFindNamedFactory(factoryName);
        }
        return (ConnectionFactory)iterator.next();
    }

    private JMSContext createContext(ConnectionFactory factory, String username, String password, String mode) {
        int sessionMode;
        switch (mode.toUpperCase()) {
            case "AUTO_ACKNOWLEDGE": {
                sessionMode = 1;
                break;
            }
            case "SESSION_TRANSACTED": {
                sessionMode = 0;
                break;
            }
            case "CLIENT_ACKNOWLEDGE": {
                sessionMode = 2;
                break;
            }
            case "DUPS_OK_ACKNOWLEDGE": {
                sessionMode = 3;
                break;
            }
            default: {
                throw JmsExceptions.ex.illegalStateUnknowSessionMode(mode);
            }
        }
        if (username != null) {
            return factory.createContext(username, password, sessionMode);
        }
        return factory.createContext(sessionMode);
    }
}

