/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.rabbitmq;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorCommonConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQMessageSender;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQAccept;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailStop;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQReject;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.tracing.TracingUtils;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
import io.vertx.mutiny.rabbitmq.RabbitMQMessage;
import io.vertx.mutiny.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQOptions;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.event.Reception;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

@ApplicationScoped
@Connector(value="smallrye-rabbitmq")
@ConnectorAttributes(value={@ConnectorAttribute(name="username", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The username used to authenticate to the broker", type="string", alias="rabbitmq-username"), @ConnectorAttribute(name="password", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The password used to authenticate to the broker", type="string", alias="rabbitmq-password"), @ConnectorAttribute(name="host", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker hostname", type="string", alias="rabbitmq-host", defaultValue="localhost"), @ConnectorAttribute(name="port", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker port", type="int", alias="rabbitmq-port", defaultValue="5672"), @ConnectorAttribute(name="ssl", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether or not the connection should use SSL", type="boolean", alias="rabbitmq-ssl", defaultValue="false"), @ConnectorAttribute(name="trust-all", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether to skip trust certificate verification", type="boolean", alias="rabbitmq-trust-all", defaultValue="false"), @ConnectorAttribute(name="trust-store-path", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The path to a JKS trust store", type="string", alias="rabbitmq-trust-store-path"), @ConnectorAttribute(name="trust-store-password", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The password of the JKS trust store", type="string", alias="rabbitmq-trust-store-password"), @ConnectorAttribute(name="connection-timeout", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The TCP connection timeout (ms); 0 is interpreted as no timeout", type="int", defaultValue="60000"), @ConnectorAttribute(name="handshake-timeout", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The AMQP 0-9-1 protocol handshake timeout (ms)", type="int", defaultValue="10000"), @ConnectorAttribute(name="automatic-recovery-enabled", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether automatic connection recovery is enabled", type="boolean", defaultValue="false"), @ConnectorAttribute(name="automatic-recovery-on-initial-connection", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether automatic recovery on initial connections is enabled", type="boolean", defaultValue="true"), @ConnectorAttribute(name="reconnect-attempts", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The number of reconnection attempts", type="int", alias="rabbitmq-reconnect-attempts", defaultValue="100"), @ConnectorAttribute(name="reconnect-interval", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The interval (in seconds) between two reconnection attempts", type="int", alias="rabbitmq-reconnect-interval", defaultValue="10"), @ConnectorAttribute(name="network-recovery-interval", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="How long (ms) will automatic recovery wait before attempting to reconnect", type="int", defaultValue="5000"), @ConnectorAttribute(name="user", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The AMQP user name to use when connecting to the broker", type="string", defaultValue="guest"), @ConnectorAttribute(name="include-properties", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether to include properties when a broker message is passed on the event bus", type="boolean", defaultValue="false"), @ConnectorAttribute(name="requested-channel-max", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The initially requested maximum channel number", type="int", defaultValue="2047"), @ConnectorAttribute(name="requested-heartbeat", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The initially requested heartbeat interval (seconds), zero for none", type="int", defaultValue="60"), @ConnectorAttribute(name="use-nio", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether usage of NIO Sockets is enabled", type="boolean", defaultValue="false"), @ConnectorAttribute(name="virtual-host", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The virtual host to use when connecting to the broker", type="string", defaultValue="/", alias="rabbitmq-virtual-host"), @ConnectorAttribute(name="exchange.name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The exchange that messages are published to or consumed from. If not set, the channel name is used", type="string"), @ConnectorAttribute(name="exchange.durable", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether the exchange is durable", type="boolean", defaultValue="true"), @ConnectorAttribute(name="exchange.auto-delete", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether the exchange should be deleted after use", type="boolean", defaultValue="false"), @ConnectorAttribute(name="exchange.type", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The exchange type: direct, fanout, headers or topic (default)", type="string", defaultValue="topic"), @ConnectorAttribute(name="exchange.declare", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether to declare the exchange; set to false if the exchange is expected to be set up independently", type="boolean", defaultValue="true"), @ConnectorAttribute(name="queue.name", direction=ConnectorAttribute.Direction.INCOMING, description="The queue from which messages are consumed.", type="string", mandatory=true), @ConnectorAttribute(name="queue.durable", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the queue is durable", type="boolean", defaultValue="true"), @ConnectorAttribute(name="queue.exclusive", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the queue is for exclusive use", type="boolean", defaultValue="false"), @ConnectorAttribute(name="queue.auto-delete", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the queue should be deleted after use", type="boolean", defaultValue="false"), @ConnectorAttribute(name="queue.declare", direction=ConnectorAttribute.Direction.INCOMING, description="Whether to declare the queue and binding; set to false if these are expected to be set up independently", type="boolean", defaultValue="true"), @ConnectorAttribute(name="queue.ttl", direction=ConnectorAttribute.Direction.INCOMING, description="If specified, the time (ms) for which a message can remain in the queue undelivered before it is dead", type="long"), @ConnectorAttribute(name="max-outgoing-internal-queue-size", direction=ConnectorAttribute.Direction.OUTGOING, description="The maximum size of the outgoing internal queue", type="int"), @ConnectorAttribute(name="max-incoming-internal-queue-size", direction=ConnectorAttribute.Direction.INCOMING, description="The maximum size of the incoming internal queue", type="int"), @ConnectorAttribute(name="auto-bind-dlq", direction=ConnectorAttribute.Direction.INCOMING, description="Whether to automatically declare the DLQ and bind it to the binder DLX", type="boolean", defaultValue="false"), @ConnectorAttribute(name="dead-letter-queue-name", direction=ConnectorAttribute.Direction.INCOMING, description="The name of the DLQ; if not supplied will default to the queue name with '.dlq' appended", type="string"), @ConnectorAttribute(name="dead-letter-exchange", direction=ConnectorAttribute.Direction.INCOMING, description="A DLX to assign to the queue. Relevant only if auto-bind-dlq is true", type="string", defaultValue="DLX"), @ConnectorAttribute(name="dead-letter-exchange-type", direction=ConnectorAttribute.Direction.INCOMING, description="The type of the DLX to assign to the queue. Relevant only if auto-bind-dlq is true", type="string", defaultValue="direct"), @ConnectorAttribute(name="dead-letter-routing-key", direction=ConnectorAttribute.Direction.INCOMING, description="A dead letter routing key to assign to the queue; if not supplied will default to the queue name", type="string"), @ConnectorAttribute(name="dlx.declare", direction=ConnectorAttribute.Direction.INCOMING, description="Whether to declare the dead letter exchange binding. Relevant only if auto-bind-dlq is true; set to false if these are expected to be set up independently", type="boolean", defaultValue="false"), @ConnectorAttribute(name="failure-strategy", direction=ConnectorAttribute.Direction.INCOMING, description="The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default)", type="string", defaultValue="reject"), @ConnectorAttribute(name="broadcast", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received RabbitMQ messages must be dispatched to multiple _subscribers_", type="boolean", defaultValue="false"), @ConnectorAttribute(name="auto-acknowledgement", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement", type="boolean", defaultValue="false"), @ConnectorAttribute(name="keep-most-recent", direction=ConnectorAttribute.Direction.INCOMING, description="Whether to discard old messages instead of recent ones", type="boolean", defaultValue="false"), @ConnectorAttribute(name="routing-keys", direction=ConnectorAttribute.Direction.INCOMING, description="A comma-separated list of routing keys to bind the queue to the exchange", type="string", defaultValue="#"), @ConnectorAttribute(name="max-inflight-messages", direction=ConnectorAttribute.Direction.OUTGOING, description="The maximum number of messages to be written to RabbitMQ concurrently; must be a positive number", type="long", defaultValue="1024"), @ConnectorAttribute(name="default-routing-key", direction=ConnectorAttribute.Direction.OUTGOING, description="The default routing key to use when sending messages to the exchange", type="string", defaultValue=""), @ConnectorAttribute(name="default-ttl", direction=ConnectorAttribute.Direction.OUTGOING, description="If specified, the time (ms) sent messages can remain in queues undelivered before they are dead", type="long"), @ConnectorAttribute(name="tracing.enabled", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether tracing is enabled (default) or disabled", type="boolean", defaultValue="true"), @ConnectorAttribute(name="tracing.attribute-headers", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true", type="string", defaultValue="")})
public class RabbitMQConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory,
HealthReporter {
    static final String CONNECTOR_NAME = "smallrye-rabbitmq";
    private final List<RabbitMQClient> clients = new CopyOnWriteArrayList<RabbitMQClient>();
    private final Map<String, ChannelStatus> incomingChannelStatus = new ConcurrentHashMap<String, ChannelStatus>();
    private final Map<String, ChannelStatus> outgoingChannelStatus = new ConcurrentHashMap<String, ChannelStatus>();
    private final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();
    @Inject
    ExecutionHolder executionHolder;

    RabbitMQConnector() {
    }

    private static String getExchangeName(RabbitMQConnectorCommonConfiguration config) {
        return config.getExchangeName().orElse(config.getChannel());
    }

    @PostConstruct
    void init() {
        TracingUtils.initialise();
    }

    private Multi<? extends Message<?>> getStreamOfMessages(RabbitMQConsumer receiver, ConnectionHolder holder, RabbitMQConnectorIncomingConfiguration ic, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck) {
        String queueName = ic.getQueueName();
        boolean isTracingEnabled = ic.getTracingEnabled();
        List attributeHeaders = Arrays.stream(ic.getTracingAttributeHeaders().split(",")).map(String::trim).collect(Collectors.toList());
        RabbitMQLogging.log.receiverListeningAddress(queueName);
        BroadcastProcessor processor = BroadcastProcessor.create();
        receiver.exceptionHandler(t -> {
            RabbitMQLogging.log.receiverError((Throwable)t);
            processor.onError(t);
        });
        holder.onFailure(arg_0 -> ((BroadcastProcessor)processor).onError(arg_0));
        return Multi.createFrom().deferred(() -> {
            Multi stream = receiver.toMulti().map(m -> new IncomingRabbitMQMessage((RabbitMQMessage)m, holder, isTracingEnabled, onNack, onAck)).map(m -> isTracingEnabled ? TracingUtils.addIncomingTrace(m, queueName, attributeHeaders) : m);
            return Multi.createBy().merging().streams(new Publisher[]{stream, processor});
        });
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        RabbitMQConnectorIncomingConfiguration ic = new RabbitMQConnectorIncomingConfiguration(config);
        this.incomingChannelStatus.put(ic.getChannel(), ChannelStatus.INITIALISING);
        RabbitMQClient client = this.createClient(new RabbitMQConnectorCommonConfiguration(config));
        ConnectionHolder holder = new ConnectionHolder(client, ic, this.getVertx());
        RabbitMQFailureHandler onNack = this.createFailureHandler(ic);
        RabbitMQAckHandler onAck = this.createAckHandler(ic);
        Uni uniQueue = holder.getOrEstablishConnection().onItem().call(connection -> this.establishQueue((RabbitMQClient)connection, ic)).onItem().call(connection -> this.establishDLQ((RabbitMQClient)connection, ic)).onItem().invoke(connection -> this.incomingChannelStatus.put(ic.getChannel(), ChannelStatus.CONNECTED));
        Integer interval = ic.getReconnectInterval();
        Integer attempts = ic.getReconnectAttempts();
        Multi multi = uniQueue.onItem().transformToUni(connection -> client.basicConsumer(ic.getQueueName(), new QueueOptions().setAutoAck(ic.getAutoAcknowledgement().booleanValue()).setMaxInternalQueueSize(ic.getMaxIncomingInternalQueueSize().orElse(Integer.MAX_VALUE).intValue()).setKeepMostRecent(ic.getKeepMostRecent().booleanValue()))).onItem().transformToMulti(consumer -> this.getStreamOfMessages((RabbitMQConsumer)consumer, holder, ic, onNack, onAck)).plug(m -> {
            if (attempts > 0) {
                return m.onFailure().invoke(RabbitMQLogging.log::retrieveMessagesRetrying).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(interval.intValue())).atMost((long)attempts.intValue()).onFailure().invoke(t -> {
                    this.incomingChannelStatus.put(ic.getChannel(), ChannelStatus.NOT_CONNECTED);
                    RabbitMQLogging.log.retrieveMessagesNoMoreRetrying((Throwable)t);
                });
            }
            return m;
        });
        if (Boolean.TRUE.equals(ic.getBroadcast())) {
            multi = multi.broadcast().toAllSubscribers();
        }
        return ReactiveStreams.fromPublisher((Publisher)multi);
    }

    private Uni<?> establishDLQ(RabbitMQClient client, RabbitMQConnectorIncomingConfiguration ic) {
        String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", ic.getQueueName()));
        String deadLetterExchangeName = ic.getDeadLetterExchange();
        String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(ic.getQueueName());
        Uni dlxFlow = Uni.createFrom().item(() -> ic.getAutoBindDlq() != false && ic.getDlxDeclare() != false ? null : deadLetterExchangeName).onItem().ifNull().switchTo(() -> client.exchangeDeclare(deadLetterExchangeName, ic.getDeadLetterExchangeType(), true, false).onItem().invoke(() -> RabbitMQLogging.log.dlxEstablished(deadLetterExchangeName)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishDlx(deadLetterExchangeName, (Throwable)ex)).onItem().transform(v -> deadLetterExchangeName));
        return dlxFlow.onItem().transform(v -> Boolean.TRUE.equals(ic.getAutoBindDlq()) ? null : deadLetterQueueName).onItem().ifNull().switchTo(() -> client.queueDeclare(deadLetterQueueName, true, false, false).onItem().invoke(() -> RabbitMQLogging.log.queueEstablished(deadLetterQueueName)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishQueue(deadLetterQueueName, (Throwable)ex)).onItem().call(v -> client.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey)).onItem().invoke(() -> RabbitMQLogging.log.bindingEstablished(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishBinding(deadLetterQueueName, deadLetterExchangeName, (Throwable)ex)).onItem().transform(v -> deadLetterQueueName));
    }

    private RabbitMQClient createClient(RabbitMQConnectorCommonConfiguration config) {
        RabbitMQOptions rabbitMQClientConfig = new RabbitMQOptions().setHost(config.getHost()).setPort(config.getPort().intValue()).setSsl(config.getSsl().booleanValue()).setTrustAll(config.getTrustAll().booleanValue()).setAutomaticRecoveryEnabled(config.getAutomaticRecoveryEnabled().booleanValue()).setAutomaticRecoveryOnInitialConnection(config.getAutomaticRecoveryOnInitialConnection().booleanValue()).setReconnectAttempts(config.getReconnectAttempts().intValue()).setReconnectInterval((long)config.getReconnectInterval().intValue()).setUser(config.getUser()).setConnectionTimeout(config.getConnectionTimeout().intValue()).setHandshakeTimeout(config.getHandshakeTimeout().intValue()).setIncludeProperties(config.getIncludeProperties().booleanValue()).setNetworkRecoveryInterval((long)config.getNetworkRecoveryInterval().intValue()).setRequestedChannelMax(config.getRequestedChannelMax().intValue()).setRequestedHeartbeat(config.getRequestedHeartbeat().intValue()).setUseNio(config.getUseNio().booleanValue()).setVirtualHost(config.getVirtualHost());
        Optional<String> trustStorePath = config.getTrustStorePath();
        if (trustStorePath.isPresent()) {
            JksOptions jks = new JksOptions();
            jks.setPath(trustStorePath.get());
            config.getTrustStorePassword().ifPresent(arg_0 -> ((JksOptions)jks).setPassword(arg_0));
            rabbitMQClientConfig.setTrustStoreOptions(jks);
        }
        config.getUsername().ifPresent(arg_0 -> ((RabbitMQOptions)rabbitMQClientConfig).setUser(arg_0));
        config.getPassword().ifPresent(arg_0 -> ((RabbitMQOptions)rabbitMQClientConfig).setPassword(arg_0));
        RabbitMQClient client = RabbitMQClient.create((Vertx)this.getVertx(), (RabbitMQOptions)rabbitMQClientConfig);
        this.addClient(client);
        return client;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        RabbitMQConnectorOutgoingConfiguration oc = new RabbitMQConnectorOutgoingConfiguration(config);
        this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.INITIALISING);
        RabbitMQClient client = this.createClient(new RabbitMQConnectorCommonConfiguration(config));
        AtomicReference sender = new AtomicReference();
        ConnectionHolder holder = new ConnectionHolder(client, oc, this.getVertx());
        Uni getSender = Uni.createFrom().item((Object)((RabbitMQPublisher)sender.get())).onItem().ifNull().switchTo(() -> {
            RabbitMQPublisher current = (RabbitMQPublisher)sender.get();
            if (current != null && client.isConnected()) {
                return Uni.createFrom().item((Object)current);
            }
            return holder.getOrEstablishConnection().onItem().call(connection -> this.establishExchange((RabbitMQClient)connection, oc)).onItem().transformToUni(connection -> Uni.createFrom().item((Object)RabbitMQPublisher.create((Vertx)this.getVertx(), (RabbitMQClient)connection, (RabbitMQPublisherOptions)new RabbitMQPublisherOptions().setReconnectAttempts(oc.getReconnectAttempts()).setReconnectInterval((long)oc.getReconnectInterval().intValue()).setMaxInternalQueueSize(oc.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE).intValue())))).onItem().call(RabbitMQPublisher::start).invoke(s -> {
                sender.set(s);
                this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.CONNECTED);
            });
        }).onFailure().invoke(t -> {
            sender.set(null);
            this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED);
        }).onCancellation().invoke(() -> {
            sender.set(null);
            this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED);
        });
        RabbitMQMessageSender processor = new RabbitMQMessageSender(oc, (Uni<RabbitMQPublisher>)getSender);
        this.subscriptions.add(processor);
        return ReactiveStreams.builder().via((Processor)processor).onError(t -> {
            RabbitMQLogging.log.error(oc.getChannel(), (Throwable)t);
            this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED);
        }).ignore();
    }

    public HealthReport getReadiness() {
        return this.getHealth(false);
    }

    public HealthReport getLiveness() {
        return this.getHealth(false);
    }

    public HealthReport getHealth(boolean strict) {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        this.incomingChannelStatus.forEach((channel, status) -> builder.add(channel, status == ChannelStatus.CONNECTED));
        this.outgoingChannelStatus.forEach((channel, status) -> builder.add(channel, strict ? status == ChannelStatus.CONNECTED : status != ChannelStatus.NOT_CONNECTED));
        return builder.build();
    }

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=50) @BeforeDestroyed(value=ApplicationScoped.class) Object ignored) {
        this.subscriptions.forEach(Subscription::cancel);
        this.clients.forEach(RabbitMQClient::stopAndAwait);
        this.clients.clear();
    }

    private Vertx getVertx() {
        return this.executionHolder.vertx();
    }

    private void addClient(RabbitMQClient client) {
        this.clients.add(client);
    }

    private Uni<String> establishExchange(RabbitMQClient client, RabbitMQConnectorCommonConfiguration config) {
        String exchangeName = RabbitMQConnector.getExchangeName(config);
        return Uni.createFrom().item(() -> Boolean.TRUE.equals(config.getExchangeDeclare()) ? null : exchangeName).onItem().ifNull().switchTo(() -> client.exchangeDeclare(exchangeName, config.getExchangeType(), config.getExchangeDurable().booleanValue(), config.getExchangeAutoDelete().booleanValue()).onItem().invoke(() -> RabbitMQLogging.log.exchangeEstablished(exchangeName)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishExchange(exchangeName, (Throwable)ex)).onItem().transform(v -> exchangeName));
    }

    private Uni<String> establishQueue(RabbitMQClient client, RabbitMQConnectorIncomingConfiguration ic) {
        String queueName = ic.getQueueName();
        JsonObject queueArgs = new JsonObject();
        queueArgs.put("x-dead-letter-exchange", (Object)ic.getDeadLetterExchange());
        queueArgs.put("x-dead-letter-routing-key", (Object)ic.getDeadLetterRoutingKey().orElse(queueName));
        ic.getQueueTtl().ifPresent(queueTtl -> {
            if (queueTtl < 0L) {
                throw RabbitMQExceptions.ex.illegalArgumentInvalidQueueTtl();
            }
            queueArgs.put("x-message-ttl", queueTtl);
        });
        return this.establishExchange(client, ic).onItem().transform(v -> Boolean.TRUE.equals(ic.getQueueDeclare()) ? null : queueName).onItem().ifNull().switchTo(() -> client.queueDeclare(queueName, ic.getQueueDurable().booleanValue(), ic.getQueueExclusive().booleanValue(), ic.getQueueAutoDelete().booleanValue(), queueArgs).onItem().invoke(() -> RabbitMQLogging.log.queueEstablished(queueName)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishQueue(queueName, (Throwable)ex)).onItem().transformToMulti(v -> this.establishBindings(client, ic)).onCompletion().invoke(() -> Multi.createFrom().item((Object)"ignore")).onItem().ignoreAsUni().onItem().transform(v -> queueName));
    }

    private Multi<String> establishBindings(RabbitMQClient client, RabbitMQConnectorIncomingConfiguration ic) {
        String exchangeName = RabbitMQConnector.getExchangeName(ic);
        String queueName = ic.getQueueName();
        List routingKeys = Arrays.stream(ic.getRoutingKeys().split(",")).map(String::trim).collect(Collectors.toList());
        return Multi.createFrom().iterable(routingKeys).onItem().call(routingKey -> client.queueBind(queueName, exchangeName, routingKey)).onItem().invoke(routingKey -> RabbitMQLogging.log.bindingEstablished(queueName, exchangeName, (String)routingKey)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishBinding(queueName, exchangeName, (Throwable)ex));
    }

    private RabbitMQFailureHandler createFailureHandler(RabbitMQConnectorIncomingConfiguration config) {
        String strategy = config.getFailureStrategy();
        RabbitMQFailureHandler.Strategy actualStrategy = RabbitMQFailureHandler.Strategy.from(strategy);
        switch (actualStrategy) {
            case FAIL: {
                return new RabbitMQFailStop(this, config.getChannel());
            }
            case ACCEPT: {
                return new RabbitMQAccept(config.getChannel());
            }
            case REJECT: {
                return new RabbitMQReject(config.getChannel());
            }
        }
        throw RabbitMQExceptions.ex.illegalArgumentInvalidFailureStrategy(strategy);
    }

    private RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration ic) {
        return Boolean.TRUE.equals(ic.getAutoAcknowledgement()) ? new RabbitMQAutoAck(ic.getChannel()) : new RabbitMQAck(ic.getChannel());
    }

    public void reportIncomingFailure(String channel, Throwable reason) {
        RabbitMQLogging.log.failureReported(channel, reason);
        this.incomingChannelStatus.put(channel, ChannelStatus.NOT_CONNECTED);
        this.terminate(null);
    }

    private static enum ChannelStatus {
        CONNECTED,
        NOT_CONNECTED,
        INITIALISING;

    }
}

