/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.messaging.connectors.jms;

import io.helidon.common.Builder;
import io.helidon.common.configurable.ScheduledThreadPoolConfig;
import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
import io.helidon.common.configurable.ThreadPoolConfig;
import io.helidon.common.configurable.ThreadPoolSupplier;
import io.helidon.common.reactive.BufferedEmittingPublisher;
import io.helidon.common.reactive.Multi;
import io.helidon.config.Config;
import io.helidon.config.ConfigValue;
import io.helidon.config.mp.MpConfig;
import io.helidon.messaging.MessagingException;
import io.helidon.messaging.NackHandler;
import io.helidon.messaging.Stoppable;
import io.helidon.messaging.connectors.jms.AbstractJmsMessage;
import io.helidon.messaging.connectors.jms.AcknowledgeMode;
import io.helidon.messaging.connectors.jms.ConfigHelper;
import io.helidon.messaging.connectors.jms.ConnectionContext;
import io.helidon.messaging.connectors.jms.JmsBytesMessage;
import io.helidon.messaging.connectors.jms.JmsConfigBuilder;
import io.helidon.messaging.connectors.jms.JmsMessage;
import io.helidon.messaging.connectors.jms.JmsNackHandler;
import io.helidon.messaging.connectors.jms.JmsTextMessage;
import io.helidon.messaging.connectors.jms.MessageMapper;
import io.helidon.messaging.connectors.jms.MessageMappers;
import io.helidon.messaging.connectors.jms.OutgoingJmsMessage;
import io.helidon.messaging.connectors.jms.SessionMetadata;
import io.helidon.messaging.connectors.jms.shim.JakartaJms;
import io.helidon.messaging.connectors.jms.shim.JakartaWrapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;
import jakarta.inject.Inject;
import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.jms.ConnectionFactory;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorAttribute;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorAttributes;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;

@ApplicationScoped
@Connector(value="helidon-jms")
@ConnectorAttributes(value={@ConnectorAttribute(name="username", description="User name used to connect JMS session", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string"), @ConnectorAttribute(name="password", description="Password to connect JMS session", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string"), @ConnectorAttribute(name="type", description="Possible values are: queue, topic", defaultValue="queue", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string"), @ConnectorAttribute(name="destination", description="Queue or topic name", mandatory=true, direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string"), @ConnectorAttribute(name="acknowledge-mode", description="Possible values are: AUTO_ACKNOWLEDGE- session automatically acknowledges a client\u2019s receipt of a message, CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually, DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages.", defaultValue="AUTO_ACKNOWLEDGE", direction=ConnectorAttribute.Direction.INCOMING, type="io.helidon.messaging.connectors.jms.AcknowledgeMode"), @ConnectorAttribute(name="transacted", description="Indicates whether the session will use a local transaction.", mandatory=false, defaultValue="false", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="boolean"), @ConnectorAttribute(name="await-ack", description="Wait for the acknowledgement of previous message before pulling next one.", mandatory=false, defaultValue="false", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="boolean"), @ConnectorAttribute(name="message-selector", description="JMS API message selector expression based on a subset of the SQL92. Expression can only access headers and properties, not the payload.", mandatory=false, direction=ConnectorAttribute.Direction.INCOMING, type="string"), @ConnectorAttribute(name="client-id", description="Client identifier for JMS connection.", mandatory=false, direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string"), @ConnectorAttribute(name="durable", description="True for creating durable consumer (only for topic).", mandatory=false, defaultValue="false", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="boolean"), @ConnectorAttribute(name="subscriber-name", description="Subscriber name for durable consumer used to identify subscription.", mandatory=false, direction=ConnectorAttribute.Direction.INCOMING, type="string"), @ConnectorAttribute(name="non-local", description="If true then any messages published to the topic using this session\u2019s connection, or any other connection with the same client identifier, will not be added to the durable subscription.", mandatory=false, defaultValue="false", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="boolean"), @ConnectorAttribute(name="named-factory", description="Select in case factory is injected as a named bean or configured with name.", mandatory=false, direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string"), @ConnectorAttribute(name="poll-timeout", description="Timeout for polling for next message in every poll cycle in millis. Default value: 50", mandatory=false, defaultValue="50", direction=ConnectorAttribute.Direction.INCOMING, type="long"), @ConnectorAttribute(name="period-executions", description="Period for executing poll cycles in millis.", mandatory=false, defaultValue="100", direction=ConnectorAttribute.Direction.INCOMING, type="long"), @ConnectorAttribute(name="session-group-id", description="When multiple channels share same session-group-id, they share same JMS session and same JDBC connection as well.", mandatory=false, direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string"), @ConnectorAttribute(name="jndi.jms-factory", description="JNDI name of JMS factory.", mandatory=false, direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="string"), @ConnectorAttribute(name="jndi.env-properties", description="Environment properties used for creating initial context java.naming.factory.initial, java.naming.provider.url \u2026", mandatory=false, direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="properties")})
public class JmsConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory,
Stoppable {
    private static final System.Logger LOGGER = System.getLogger(JmsConnector.class.getName());
    public static final String CONNECTOR_NAME = "helidon-jms";
    public static final String NAMED_FACTORY_ATTRIBUTE = "named-factory";
    public static final String USERNAME_ATTRIBUTE = "username";
    public static final String PASSWORD_ATTRIBUTE = "password";
    public static final String CLIENT_ID_ATTRIBUTE = "client-id";
    public static final String DURABLE_ATTRIBUTE = "durable";
    public static final String SUBSCRIBER_NAME_ATTRIBUTE = "subscriber-name";
    public static final String NON_LOCAL_ATTRIBUTE = "non-local";
    public static final String ACK_MODE_ATTRIBUTE = "acknowledge-mode";
    public static final String TRANSACTED_ATTRIBUTE = "transacted";
    public static final String AWAIT_ACK_ATTRIBUTE = "await-ack";
    public static final String MESSAGE_SELECTOR_ATTRIBUTE = "message-selector";
    public static final String POLL_TIMEOUT_ATTRIBUTE = "poll-timeout";
    public static final String PERIOD_EXECUTIONS_ATTRIBUTE = "period-executions";
    public static final String TYPE_ATTRIBUTE = "type";
    public static final String DESTINATION_ATTRIBUTE = "destination";
    public static final String SESSION_GROUP_ID_ATTRIBUTE = "session-group-id";
    static final String JNDI_ATTRIBUTE = "jndi";
    static final String JNDI_PROPS_ATTRIBUTE = "env-properties";
    static final String JNDI_JMS_FACTORY_ATTRIBUTE = "jms-factory";
    static final String JNDI_DESTINATION_ATTRIBUTE = "destination";
    static final AcknowledgeMode ACK_MODE_DEFAULT = AcknowledgeMode.AUTO_ACKNOWLEDGE;
    static final boolean TRANSACTED_DEFAULT = false;
    static final boolean AWAIT_ACK_DEFAULT = false;
    static final long POLL_TIMEOUT_DEFAULT = 50L;
    static final long PERIOD_EXECUTIONS_DEFAULT = 100L;
    static final String TYPE_PROP_DEFAULT = "queue";
    static final String JNDI_JMS_FACTORY_DEFAULT = "ConnectionFactory";
    static final String SCHEDULER_THREAD_NAME_PREFIX = "jms-poll-";
    static final String EXECUTOR_THREAD_NAME_PREFIX = "jms-";
    private final Instance<jakarta.jms.ConnectionFactory> jakartaConnectionFactories;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService executor;
    private final Map<String, SessionMetadata> sessionRegister = new HashMap<String, SessionMetadata>();
    private final Map<String, jakarta.jms.ConnectionFactory> connectionFactoryMap;
    @Inject
    private Instance<ConnectionFactory> javaxConnectionFactories;

    public static JmsConnectorBuilder builder() {
        return new JmsConnectorBuilder();
    }

    public static JmsConnector create() {
        return JmsConnector.builder().config(Config.empty()).build();
    }

    public static JmsConfigBuilder configBuilder() {
        return new JmsConfigBuilder();
    }

    @Inject
    protected JmsConnector(Config config, Instance<jakarta.jms.ConnectionFactory> jakartaConnectionFactories) {
        this.jakartaConnectionFactories = jakartaConnectionFactories;
        this.connectionFactoryMap = Map.of();
        this.scheduler = ((ScheduledThreadPoolConfig.Builder)((ScheduledThreadPoolConfig.Builder)ScheduledThreadPoolSupplier.builder().threadNamePrefix(SCHEDULER_THREAD_NAME_PREFIX)).config((io.helidon.common.config.Config)config)).build().get();
        this.executor = ((ThreadPoolConfig.Builder)((ThreadPoolConfig.Builder)ThreadPoolSupplier.builder().threadNamePrefix(EXECUTOR_THREAD_NAME_PREFIX)).config((io.helidon.common.config.Config)config)).build().get();
    }

    protected JmsConnector(Map<String, jakarta.jms.ConnectionFactory> connectionFactoryMap, ScheduledExecutorService scheduler, ExecutorService executor) {
        this.jakartaConnectionFactories = null;
        this.javaxConnectionFactories = null;
        this.connectionFactoryMap = connectionFactoryMap;
        this.scheduler = scheduler;
        this.executor = executor;
    }

    public void stop() {
        this.scheduler.shutdown();
        this.executor.shutdown();
        try {
            if (!this.scheduler.awaitTermination(1L, TimeUnit.SECONDS)) {
                this.scheduler.shutdownNow();
            }
            if (!this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
                this.executor.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            LOGGER.log(System.Logger.Level.ERROR, () -> "Error when awaiting scheduler termination.", (Throwable)e);
            this.scheduler.shutdownNow();
            this.executor.shutdownNow();
        }
        for (SessionMetadata e : this.sessionRegister.values()) {
            try {
                e.session().close();
                e.connection().close();
            }
            catch (JMSException jmsException) {
                LOGGER.log(System.Logger.Level.ERROR, () -> "Error when stopping JMS sessions.", (Throwable)jmsException);
            }
        }
        LOGGER.log(System.Logger.Level.INFO, "JMS Connector gracefully stopped.");
    }

    void terminate(@Observes @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        this.stop();
    }

    protected JmsMessage<?> createMessage(NackHandler nackHandler, final Message message, Executor executor, SessionMetadata sessionMetadata) {
        if (message instanceof TextMessage) {
            return new JmsTextMessage(nackHandler, (TextMessage)message, executor, sessionMetadata);
        }
        if (message instanceof BytesMessage) {
            return new JmsBytesMessage(nackHandler, (BytesMessage)message, executor, sessionMetadata);
        }
        return new AbstractJmsMessage<Message>(this, nackHandler, executor, sessionMetadata){

            @Override
            public Message getJmsMessage() {
                return message;
            }

            public Message getPayload() {
                return message;
            }
        };
    }

    protected Optional<? extends jakarta.jms.ConnectionFactory> getFactory(ConnectionContext ctx) {
        if (ctx.isJndi()) {
            return ctx.lookupFactory();
        }
        ConfigValue factoryName = ctx.config().get(NAMED_FACTORY_ATTRIBUTE).asString();
        if (factoryName.isPresent()) {
            return Optional.ofNullable(this.connectionFactoryMap.get(factoryName.get())).or(() -> this.getConnectionFactoryBean((String)factoryName.get()));
        }
        return this.connectionFactoryMap.values().stream().findFirst().or(() -> this.getConnectionFactoryBean((String)factoryName.get()));
    }

    private <T> Optional<jakarta.jms.ConnectionFactory> getConnectionFactoryBean(String name) {
        NamedLiteral literal = NamedLiteral.of((String)name);
        return this.jakartaConnectionFactories.select(new Annotation[]{literal}).stream().findFirst().or(() -> this.javaxConnectionFactories.select(new Annotation[]{literal}).stream().map(JakartaJms::create).findFirst());
    }

    public PublisherBuilder<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> getPublisherBuilder(org.eclipse.microprofile.config.Config mpConfig) {
        Config config = MpConfig.toHelidonConfig((org.eclipse.microprofile.config.Config)mpConfig);
        AcknowledgeMode ackMode = config.get(ACK_MODE_ATTRIBUTE).asString().map(AcknowledgeMode::parse).orElse(ACK_MODE_DEFAULT);
        Boolean awaitAck = (Boolean)config.get(AWAIT_ACK_ATTRIBUTE).asBoolean().orElse((Object)false);
        ConnectionContext ctx = new ConnectionContext(config);
        jakarta.jms.ConnectionFactory factory = this.getFactory(ctx).orElseThrow(() -> new MessagingException("No ConnectionFactory found."));
        try {
            SessionMetadata sessionEntry = this.prepareSession(config, factory);
            Destination destination = this.createDestination(sessionEntry.session(), ctx);
            MessageConsumer consumer = this.createConsumer(config, destination, sessionEntry);
            BufferedEmittingPublisher emitter = BufferedEmittingPublisher.create();
            JmsNackHandler nackHandler = JmsNackHandler.create(emitter, config, this);
            Long pollTimeout = (Long)config.get(POLL_TIMEOUT_ATTRIBUTE).asLong().orElse((Object)50L);
            Long periodExecutions = (Long)config.get(PERIOD_EXECUTIONS_ATTRIBUTE).asLong().orElse((Object)100L);
            AtomicReference lastMessage = new AtomicReference();
            this.scheduler.scheduleAtFixedRate(() -> {
                if (!emitter.hasRequests()) {
                    return;
                }
                if (ackMode != AcknowledgeMode.AUTO_ACKNOWLEDGE && awaitAck.booleanValue() && lastMessage.get() != null && !((JmsMessage)lastMessage.get()).isAck()) {
                    return;
                }
                this.produce(emitter, sessionEntry, consumer, nackHandler, pollTimeout).ifPresent(lastMessage::set);
            }, 0L, periodExecutions, TimeUnit.MILLISECONDS);
            sessionEntry.connection().start();
            return ReactiveStreams.fromPublisher((Publisher)FlowAdapters.toPublisher((Flow.Publisher)Multi.create((Flow.Publisher)emitter)));
        }
        catch (JMSException e) {
            LOGGER.log(System.Logger.Level.ERROR, () -> "Error during JMS publisher preparation", (Throwable)e);
            return ReactiveStreams.failed((Throwable)e);
        }
    }

    public SubscriberBuilder<? extends org.eclipse.microprofile.reactive.messaging.Message<?>, Void> getSubscriberBuilder(org.eclipse.microprofile.config.Config mpConfig) {
        Config config = MpConfig.toHelidonConfig((org.eclipse.microprofile.config.Config)mpConfig);
        ConnectionContext ctx = new ConnectionContext(config);
        jakarta.jms.ConnectionFactory factory = this.getFactory(ctx).orElseThrow(() -> new MessagingException("No ConnectionFactory found."));
        try {
            SessionMetadata sessionEntry = this.prepareSession(config, factory);
            Session session = sessionEntry.session();
            Destination destination = this.createDestination(session, ctx);
            MessageProducer producer = this.createProducer(destination, ctx, sessionEntry);
            AtomicReference mapper = new AtomicReference();
            return ReactiveStreams.builder().flatMapCompletionStage(m -> this.consume((org.eclipse.microprofile.reactive.messaging.Message<?>)m, session, mapper, producer, config)).onError(t -> LOGGER.log(System.Logger.Level.ERROR, () -> "Error intercepted from channel " + (String)config.get("channel-name").asString().orElse((Object)"unknown"), (Throwable)t)).ignore();
        }
        catch (JMSException e) {
            throw new MessagingException("Error when creating JMS producer.", (Throwable)e);
        }
    }

    private void configureProducer(MessageProducer producer, ConnectionContext ctx) {
        Config config = ctx.config().get("producer");
        if (!config.exists()) {
            return;
        }
        Object instance = producer instanceof JakartaWrapper ? ((JakartaWrapper)producer).unwrap() : producer;
        Class<?> clazz = instance.getClass();
        Map setterMethods = Arrays.stream(clazz.getDeclaredMethods()).filter(m -> m.getParameterCount() == 1).collect(Collectors.toMap(m -> ConfigHelper.stripSet(m.getName()), Function.identity()));
        config.detach().traverse().forEach(c -> {
            String key = c.key().name();
            String normalizedKey = ConfigHelper.kebabCase2CamelCase(key);
            Method m = (Method)setterMethods.get(normalizedKey);
            if (m == null) {
                LOGGER.log(System.Logger.Level.WARNING, "JMS producer property " + key + " can't be set for producer " + clazz.getName());
                return;
            }
            try {
                m.invoke(instance, c.as(m.getParameterTypes()[0]).get());
            }
            catch (Throwable e) {
                LOGGER.log(System.Logger.Level.WARNING, "Error when setting JMS producer property " + key + " on " + clazz.getName() + "." + m.getName(), e);
            }
        });
    }

    private Optional<JmsMessage<?>> produce(BufferedEmittingPublisher<org.eclipse.microprofile.reactive.messaging.Message<?>> emitter, SessionMetadata sessionEntry, MessageConsumer consumer, JmsNackHandler nackHandler, Long pollTimeout) {
        try {
            Message message = consumer.receive(pollTimeout.longValue());
            if (message == null) {
                return Optional.empty();
            }
            LOGGER.log(System.Logger.Level.DEBUG, () -> "Received message: " + String.valueOf(message));
            JmsMessage<?> preparedMessage = this.createMessage(nackHandler, message, this.executor, sessionEntry);
            emitter.emit(preparedMessage);
            return Optional.of(preparedMessage);
        }
        catch (Throwable e) {
            emitter.fail(e);
            return Optional.empty();
        }
    }

    CompletionStage<?> consume(org.eclipse.microprofile.reactive.messaging.Message<?> m, Session session, AtomicReference<MessageMapper> mapper, MessageProducer producer, Config config) {
        if (mapper.get() == null) {
            mapper.set(MessageMappers.getJmsMessageMapper(m));
        }
        return CompletableFuture.supplyAsync(() -> this.consumeAsync(m, session, mapper, producer, config), this.executor).thenApply(aVoid -> m);
    }

    protected CompletionStage<?> consumeAsync(org.eclipse.microprofile.reactive.messaging.Message<?> m, Session session, AtomicReference<MessageMapper> mapper, MessageProducer producer, Config config) {
        try {
            Message jmsMessage = m instanceof OutgoingJmsMessage ? ((OutgoingJmsMessage)m).toJmsMessage(session, mapper.get()) : mapper.get().apply(session, m);
            producer.send(jmsMessage);
            return m.ack();
        }
        catch (JMSException e) {
            this.sendingErrorHandler(config).accept(m, e);
            return CompletableFuture.completedFuture(null);
        }
    }

    protected BiConsumer<org.eclipse.microprofile.reactive.messaging.Message<?>, JMSException> sendingErrorHandler(Config config) {
        return (m, e) -> {
            m.nack((Throwable)e);
            throw new MessagingException("Error during sending JMS message.", (Throwable)e);
        };
    }

    protected SessionMetadata prepareSession(Config config, jakarta.jms.ConnectionFactory factory) throws JMSException {
        Optional sessionGroupId = config.get(SESSION_GROUP_ID_ATTRIBUTE).asString().asOptional();
        if (sessionGroupId.isPresent() && this.sessionRegister.containsKey(sessionGroupId.get())) {
            return this.sessionRegister.get(sessionGroupId.get());
        }
        Optional user = config.get(USERNAME_ATTRIBUTE).asString().asOptional();
        Optional password = config.get(PASSWORD_ATTRIBUTE).asString().asOptional();
        Optional userId = config.get(CLIENT_ID_ATTRIBUTE).asString().asOptional();
        Connection connection = user.isPresent() && password.isPresent() ? factory.createConnection((String)user.get(), (String)password.get()) : factory.createConnection();
        if (userId.isPresent()) {
            connection.setClientID((String)userId.get());
        }
        boolean transacted = (Boolean)config.get(TRANSACTED_ATTRIBUTE).asBoolean().orElse((Object)false);
        int acknowledgeMode = config.get(ACK_MODE_ATTRIBUTE).asString().map(AcknowledgeMode::parse).orElse(ACK_MODE_DEFAULT).getAckMode();
        Session session = connection.createSession(transacted, acknowledgeMode);
        SessionMetadata sharedSessionEntry = new SessionMetadata(session, connection, factory);
        this.sessionRegister.put(sessionGroupId.orElseGet(() -> UUID.randomUUID().toString()), sharedSessionEntry);
        return sharedSessionEntry;
    }

    protected Destination createDestination(Session session, ConnectionContext ctx) {
        Optional<? extends Destination> jndiDestination;
        Config config = ctx.config();
        if (ctx.isJndi() && (jndiDestination = ctx.lookupDestination()).isPresent()) {
            return jndiDestination.get();
        }
        String type = config.get(TYPE_ATTRIBUTE).asString().map(String::toLowerCase).orElse(TYPE_PROP_DEFAULT).toLowerCase();
        String destination = (String)config.get("destination").asString().orElseThrow(() -> new MessagingException("Destination for channel " + (String)config.get("channel-name").asString().get() + " not specified!"));
        try {
            if (TYPE_PROP_DEFAULT.equals(type)) {
                return session.createQueue(destination);
            }
            if ("topic".equals(type)) {
                return session.createTopic(destination);
            }
            throw new MessagingException("Unknown type");
        }
        catch (JMSException jmsException) {
            throw new MessagingException("Error when creating destination.", (Throwable)jmsException);
        }
    }

    protected MessageConsumer createConsumer(Config config, Destination destination, SessionMetadata sessionEntry) throws JMSException {
        String messageSelector = (String)config.get(MESSAGE_SELECTOR_ATTRIBUTE).asString().orElse(null);
        String subscriberName = (String)config.get(SUBSCRIBER_NAME_ATTRIBUTE).asString().orElse(null);
        if (((Boolean)config.get(DURABLE_ATTRIBUTE).asBoolean().orElse((Object)false)).booleanValue()) {
            if (!(destination instanceof Topic)) {
                throw new MessagingException("Can't create durable consumer. Only topic can be durable!");
            }
            return sessionEntry.session().createDurableSubscriber((Topic)destination, subscriberName, messageSelector, ((Boolean)config.get(NON_LOCAL_ATTRIBUTE).asBoolean().orElse((Object)false)).booleanValue());
        }
        return sessionEntry.session().createConsumer(destination, messageSelector);
    }

    protected MessageProducer createProducer(Destination destination, ConnectionContext ctx, SessionMetadata sessionEntry) throws JMSException {
        MessageProducer producer = sessionEntry.session().createProducer(destination);
        this.configureProducer(producer, ctx);
        return producer;
    }

    public static class JmsConnectorBuilder
    implements Builder<JmsConnectorBuilder, JmsConnector> {
        private final Map<String, jakarta.jms.ConnectionFactory> connectionFactoryMap = new HashMap<String, jakarta.jms.ConnectionFactory>();
        private ScheduledExecutorService scheduler;
        private ExecutorService executor;
        private Config config;

        public JmsConnectorBuilder connectionFactory(String name, jakarta.jms.ConnectionFactory connectionFactory) {
            this.connectionFactoryMap.put(name, connectionFactory);
            return this;
        }

        public JmsConnectorBuilder config(Config config) {
            this.config = config;
            return this;
        }

        public JmsConnectorBuilder executor(ExecutorService executor) {
            this.executor = executor;
            return this;
        }

        public JmsConnectorBuilder scheduler(ScheduledExecutorService scheduler) {
            this.scheduler = scheduler;
            return this;
        }

        public JmsConnectorBuilder executor(ThreadPoolSupplier executorSupplier) {
            this.executor = executorSupplier.get();
            return this;
        }

        public JmsConnectorBuilder scheduler(ScheduledThreadPoolSupplier schedulerPoolSupplier) {
            this.scheduler = schedulerPoolSupplier.get();
            return this;
        }

        public JmsConnector build() {
            if (this.config == null) {
                this.config = Config.empty();
            }
            if (this.executor == null) {
                this.executor = ((ThreadPoolConfig.Builder)((ThreadPoolConfig.Builder)ThreadPoolSupplier.builder().threadNamePrefix(JmsConnector.EXECUTOR_THREAD_NAME_PREFIX)).config((io.helidon.common.config.Config)this.config)).build().get();
            }
            if (this.scheduler == null) {
                this.scheduler = ((ScheduledThreadPoolConfig.Builder)((ScheduledThreadPoolConfig.Builder)ScheduledThreadPoolSupplier.builder().threadNamePrefix(JmsConnector.SCHEDULER_THREAD_NAME_PREFIX)).config((io.helidon.common.config.Config)this.config)).build().get();
            }
            return new JmsConnector(this.connectionFactoryMap, this.scheduler, this.executor);
        }
    }
}

