/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.rabbitmq;

import com.rabbitmq.client.impl.CredentialsProvider;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQClientHelper;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorCommonConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQMessageSender;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQAccept;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailStop;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQReject;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
import io.vertx.mutiny.rabbitmq.RabbitMQMessage;
import io.vertx.mutiny.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQOptions;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(value="smallrye-rabbitmq")
@ConnectorAttributes(value={@ConnectorAttribute(name="username", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The username used to authenticate to the broker", type="string", alias="rabbitmq-username"), @ConnectorAttribute(name="password", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The password used to authenticate to the broker", type="string", alias="rabbitmq-password"), @ConnectorAttribute(name="host", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker hostname", type="string", alias="rabbitmq-host", defaultValue="localhost"), @ConnectorAttribute(name="port", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker port", type="int", alias="rabbitmq-port", defaultValue="5672"), @ConnectorAttribute(name="ssl", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether or not the connection should use SSL", type="boolean", alias="rabbitmq-ssl", defaultValue="false"), @ConnectorAttribute(name="trust-all", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether to skip trust certificate verification", type="boolean", alias="rabbitmq-trust-all", defaultValue="false"), @ConnectorAttribute(name="trust-store-path", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The path to a JKS trust store", type="string", alias="rabbitmq-trust-store-path"), @ConnectorAttribute(name="trust-store-password", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The password of the JKS trust store", type="string", alias="rabbitmq-trust-store-password"), @ConnectorAttribute(name="connection-timeout", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The TCP connection timeout (ms); 0 is interpreted as no timeout", type="int", defaultValue="60000"), @ConnectorAttribute(name="handshake-timeout", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The AMQP 0-9-1 protocol handshake timeout (ms)", type="int", defaultValue="10000"), @ConnectorAttribute(name="automatic-recovery-enabled", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether automatic connection recovery is enabled", type="boolean", defaultValue="false"), @ConnectorAttribute(name="automatic-recovery-on-initial-connection", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether automatic recovery on initial connections is enabled", type="boolean", defaultValue="true"), @ConnectorAttribute(name="reconnect-attempts", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The number of reconnection attempts", type="int", alias="rabbitmq-reconnect-attempts", defaultValue="100"), @ConnectorAttribute(name="reconnect-interval", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The interval (in seconds) between two reconnection attempts", type="int", alias="rabbitmq-reconnect-interval", defaultValue="10"), @ConnectorAttribute(name="network-recovery-interval", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="How long (ms) will automatic recovery wait before attempting to reconnect", type="int", defaultValue="5000"), @ConnectorAttribute(name="user", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The user name to use when connecting to the broker", type="string", defaultValue="guest"), @ConnectorAttribute(name="include-properties", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether to include properties when a broker message is passed on the event bus", type="boolean", defaultValue="false"), @ConnectorAttribute(name="requested-channel-max", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The initially requested maximum channel number", type="int", defaultValue="2047"), @ConnectorAttribute(name="requested-heartbeat", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The initially requested heartbeat interval (seconds), zero for none", type="int", defaultValue="60"), @ConnectorAttribute(name="use-nio", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether usage of NIO Sockets is enabled", type="boolean", defaultValue="false"), @ConnectorAttribute(name="virtual-host", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The virtual host to use when connecting to the broker", type="string", defaultValue="/", alias="rabbitmq-virtual-host"), @ConnectorAttribute(name="client-options-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of the RabbitMQ Client Option bean used to customize the RabbitMQ client configuration", type="string", alias="rabbitmq-client-options-name"), @ConnectorAttribute(name="credentials-provider-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client", type="string", alias="rabbitmq-credentials-provider-name"), @ConnectorAttribute(name="exchange.name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to \"\", the default exchange is used.", type="string"), @ConnectorAttribute(name="exchange.durable", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether the exchange is durable", type="boolean", defaultValue="true"), @ConnectorAttribute(name="exchange.auto-delete", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether the exchange should be deleted after use", type="boolean", defaultValue="false"), @ConnectorAttribute(name="exchange.type", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The exchange type: direct, fanout, headers or topic (default)", type="string", defaultValue="topic"), @ConnectorAttribute(name="exchange.declare", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether to declare the exchange; set to false if the exchange is expected to be set up independently", type="boolean", defaultValue="true"), @ConnectorAttribute(name="queue.name", direction=ConnectorAttribute.Direction.INCOMING, description="The queue from which messages are consumed.", type="string", mandatory=true), @ConnectorAttribute(name="queue.durable", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the queue is durable", type="boolean", defaultValue="true"), @ConnectorAttribute(name="queue.exclusive", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the queue is for exclusive use", type="boolean", defaultValue="false"), @ConnectorAttribute(name="queue.auto-delete", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the queue should be deleted after use", type="boolean", defaultValue="false"), @ConnectorAttribute(name="queue.declare", direction=ConnectorAttribute.Direction.INCOMING, description="Whether to declare the queue and binding; set to false if these are expected to be set up independently", type="boolean", defaultValue="true"), @ConnectorAttribute(name="queue.ttl", direction=ConnectorAttribute.Direction.INCOMING, description="If specified, the time (ms) for which a message can remain in the queue undelivered before it is dead", type="long"), @ConnectorAttribute(name="queue.single-active-consumer", direction=ConnectorAttribute.Direction.INCOMING, description="If set to true, only one consumer can actively consume messages", type="boolean"), @ConnectorAttribute(name="queue.x-queue-type", direction=ConnectorAttribute.Direction.INCOMING, description="If automatically declare queue, we can choose different types of queue [quorum, classic, stream]", type="string"), @ConnectorAttribute(name="queue.x-queue-mode", direction=ConnectorAttribute.Direction.INCOMING, description="If automatically declare queue, we can choose different modes of queue [lazy, default]", type="string"), @ConnectorAttribute(name="max-outgoing-internal-queue-size", direction=ConnectorAttribute.Direction.OUTGOING, description="The maximum size of the outgoing internal queue", type="int"), @ConnectorAttribute(name="max-incoming-internal-queue-size", direction=ConnectorAttribute.Direction.INCOMING, description="The maximum size of the incoming internal queue", type="int", defaultValue="500000"), @ConnectorAttribute(name="connection-count", direction=ConnectorAttribute.Direction.INCOMING, description="The number of RabbitMQ connections to create for consuming from this queue. This might be necessary to consume from a sharded queue with a single client.", type="int", defaultValue="1"), @ConnectorAttribute(name="queue.x-max-priority", direction=ConnectorAttribute.Direction.INCOMING, description="Define priority level queue consumer", type="int"), @ConnectorAttribute(name="auto-bind-dlq", direction=ConnectorAttribute.Direction.INCOMING, description="Whether to automatically declare the DLQ and bind it to the binder DLX", type="boolean", defaultValue="false"), @ConnectorAttribute(name="dead-letter-queue-name", direction=ConnectorAttribute.Direction.INCOMING, description="The name of the DLQ; if not supplied will default to the queue name with '.dlq' appended", type="string"), @ConnectorAttribute(name="dead-letter-exchange", direction=ConnectorAttribute.Direction.INCOMING, description="A DLX to assign to the queue. Relevant only if auto-bind-dlq is true", type="string", defaultValue="DLX"), @ConnectorAttribute(name="dead-letter-exchange-type", direction=ConnectorAttribute.Direction.INCOMING, description="The type of the DLX to assign to the queue. Relevant only if auto-bind-dlq is true", type="string", defaultValue="direct"), @ConnectorAttribute(name="dead-letter-routing-key", direction=ConnectorAttribute.Direction.INCOMING, description="A dead letter routing key to assign to the queue; if not supplied will default to the queue name", type="string"), @ConnectorAttribute(name="dlx.declare", direction=ConnectorAttribute.Direction.INCOMING, description="Whether to declare the dead letter exchange binding. Relevant only if auto-bind-dlq is true; set to false if these are expected to be set up independently", type="boolean", defaultValue="false"), @ConnectorAttribute(name="dead-letter-queue-type", direction=ConnectorAttribute.Direction.INCOMING, description="If automatically declare DLQ, we can choose different types of DLQ [quorum, classic, stream]", type="string"), @ConnectorAttribute(name="dead-letter-queue-mode", direction=ConnectorAttribute.Direction.INCOMING, description="If automatically declare DLQ, we can choose different modes of DLQ [lazy, default]", type="string"), @ConnectorAttribute(name="failure-strategy", direction=ConnectorAttribute.Direction.INCOMING, description="The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default)", type="string", defaultValue="reject"), @ConnectorAttribute(name="broadcast", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received RabbitMQ messages must be dispatched to multiple _subscribers_", type="boolean", defaultValue="false"), @ConnectorAttribute(name="auto-acknowledgement", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement", type="boolean", defaultValue="false"), @ConnectorAttribute(name="keep-most-recent", direction=ConnectorAttribute.Direction.INCOMING, description="Whether to discard old messages instead of recent ones", type="boolean", defaultValue="false"), @ConnectorAttribute(name="routing-keys", direction=ConnectorAttribute.Direction.INCOMING, description="A comma-separated list of routing keys to bind the queue to the exchange. Relevant only if 'exchange.type' is topic or direct", type="string", defaultValue="#"), @ConnectorAttribute(name="arguments", direction=ConnectorAttribute.Direction.INCOMING, description="A comma-separated list of arguments [key1:value1,key2:value2,...] to bind the queue to the exchange. Relevant only if 'exchange.type' is headers", type="string"), @ConnectorAttribute(name="content-type-override", direction=ConnectorAttribute.Direction.INCOMING, description="Override the content_type attribute of the incoming message, should be a valid MINE type", type="string"), @ConnectorAttribute(name="max-outstanding-messages", direction=ConnectorAttribute.Direction.INCOMING, description="The maximum number of outstanding/unacknowledged messages being processed by the connector at a time; must be a positive number", type="int"), @ConnectorAttribute(name="max-inflight-messages", direction=ConnectorAttribute.Direction.OUTGOING, description="The maximum number of messages to be written to RabbitMQ concurrently; must be a positive number", type="long", defaultValue="1024"), @ConnectorAttribute(name="default-routing-key", direction=ConnectorAttribute.Direction.OUTGOING, description="The default routing key to use when sending messages to the exchange", type="string", defaultValue=""), @ConnectorAttribute(name="default-ttl", direction=ConnectorAttribute.Direction.OUTGOING, description="If specified, the time (ms) sent messages can remain in queues undelivered before they are dead", type="long"), @ConnectorAttribute(name="tracing.enabled", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether tracing is enabled (default) or disabled", type="boolean", defaultValue="true"), @ConnectorAttribute(name="tracing.attribute-headers", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true", type="string", defaultValue="")})
public class RabbitMQConnector
implements InboundConnector,
OutboundConnector,
HealthReporter {
    public static final String CONNECTOR_NAME = "smallrye-rabbitmq";
    private final Map<String, RabbitMQClient> clients = new ConcurrentHashMap<String, RabbitMQClient>();
    private final Map<String, ChannelStatus> incomingChannelStatus = new ConcurrentHashMap<String, ChannelStatus>();
    private final Map<String, ChannelStatus> outgoingChannelStatus = new ConcurrentHashMap<String, ChannelStatus>();
    private final Map<String, Flow.Subscription> subscriptions = new ConcurrentHashMap<String, Flow.Subscription>();
    @Inject
    ExecutionHolder executionHolder;
    @Inject
    @Any
    Instance<RabbitMQOptions> clientOptions;
    @Inject
    @Any
    Instance<CredentialsProvider> credentialsProviders;
    private volatile RabbitMQOpenTelemetryInstrumenter instrumenter;

    RabbitMQConnector() {
    }

    public static String getExchangeName(RabbitMQConnectorCommonConfiguration config) {
        return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel());
    }

    private Multi<? extends Message<?>> getStreamOfMessages(RabbitMQConsumer receiver, ConnectionHolder holder, RabbitMQConnectorIncomingConfiguration ic, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck) {
        String queueName = ic.getQueueName();
        boolean isTracingEnabled = ic.getTracingEnabled();
        String contentTypeOverride = ic.getContentTypeOverride().orElse(null);
        RabbitMQLogging.log.receiverListeningAddress(queueName);
        if (isTracingEnabled) {
            return receiver.toMulti().map(m -> new IncomingRabbitMQMessage((RabbitMQMessage)m, holder, onNack, onAck, contentTypeOverride)).map(msg -> this.instrumenter.traceIncoming((Message<?>)msg, RabbitMQTrace.traceQueue(queueName, msg.message.envelope().getRoutingKey(), msg.getHeaders())));
        }
        return receiver.toMulti().map(m -> new IncomingRabbitMQMessage((RabbitMQMessage)m, holder, onNack, onAck, contentTypeOverride));
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        RabbitMQConnectorIncomingConfiguration ic = new RabbitMQConnectorIncomingConfiguration(config);
        if (ic.getTracingEnabled().booleanValue() && this.instrumenter == null) {
            this.instrumenter = RabbitMQOpenTelemetryInstrumenter.createForConnector();
        }
        this.incomingChannelStatus.put(ic.getChannel(), ChannelStatus.INITIALISING);
        RabbitMQFailureHandler onNack = this.createFailureHandler(ic);
        RabbitMQAckHandler onAck = this.createAckHandler(ic);
        Integer connectionCount = ic.getConnectionCount();
        Multi multi = Multi.createFrom().range(0, connectionCount.intValue()).onItem().transformToUniAndMerge(connectionIdx -> {
            RabbitMQClient client = RabbitMQClientHelper.createClient(this, ic, this.clientOptions, this.credentialsProviders);
            client.getDelegate().addConnectionEstablishedCallback(promise -> Uni.createFrom().nullItem().onItem().call(ignored -> {
                if (ic.getMaxOutstandingMessages().isPresent()) {
                    return client.basicQos(ic.getMaxOutstandingMessages().get().intValue(), false);
                }
                return Uni.createFrom().nullItem();
            }).onItem().call(() -> this.establishQueue(client, ic)).onItem().call(() -> this.establishDLQ(client, ic)).subscribe().with(ignored -> promise.complete(), arg_0 -> ((Promise)promise).fail(arg_0)));
            ConnectionHolder holder = new ConnectionHolder(client, ic, this.getVertx());
            return holder.getOrEstablishConnection().invoke(() -> RabbitMQLogging.log.connectionEstablished((int)connectionIdx, ic.getChannel())).flatMap(connection -> this.createConsumer(ic, (RabbitMQClient)connection).map(consumer -> Tuple2.of((Object)holder, (Object)consumer)));
        }).collect().asList().onItem().invoke(() -> this.incomingChannelStatus.put(ic.getChannel(), ChannelStatus.CONNECTED)).onItem().transformToMulti(tuples -> Multi.createFrom().iterable((Iterable)tuples)).flatMap(tuple -> this.getStreamOfMessages((RabbitMQConsumer)tuple.getItem2(), (ConnectionHolder)tuple.getItem1(), ic, onNack, onAck));
        if (Boolean.TRUE.equals(ic.getBroadcast())) {
            multi = multi.broadcast().toAllSubscribers();
        }
        return multi;
    }

    private Uni<RabbitMQConsumer> createConsumer(RabbitMQConnectorIncomingConfiguration ic, RabbitMQClient client) {
        return Uni.createFrom().nullItem().onItem().transformToUni(ignored -> client.basicConsumer(this.serverQueueName(ic.getQueueName()), new QueueOptions().setAutoAck(ic.getAutoAcknowledgement().booleanValue()).setMaxInternalQueueSize(ic.getMaxIncomingInternalQueueSize().intValue()).setKeepMostRecent(ic.getKeepMostRecent().booleanValue())));
    }

    private Uni<?> establishDLQ(RabbitMQClient client, RabbitMQConnectorIncomingConfiguration ic) {
        String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", ic.getQueueName()));
        String deadLetterExchangeName = ic.getDeadLetterExchange();
        String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(ic.getQueueName());
        Uni dlxFlow = Uni.createFrom().item(() -> ic.getAutoBindDlq() != false && ic.getDlxDeclare() != false ? null : deadLetterExchangeName).onItem().ifNull().switchTo(() -> client.exchangeDeclare(deadLetterExchangeName, ic.getDeadLetterExchangeType(), true, false).onItem().invoke(() -> RabbitMQLogging.log.dlxEstablished(deadLetterExchangeName)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishDlx(deadLetterExchangeName, (Throwable)ex)).onItem().transform(v -> deadLetterExchangeName));
        JsonObject queueArgs = new JsonObject();
        ic.getDeadLetterQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType));
        ic.getDeadLetterQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode));
        return dlxFlow.onItem().transform(v -> Boolean.TRUE.equals(ic.getAutoBindDlq()) ? null : deadLetterQueueName).onItem().ifNull().switchTo(() -> client.queueDeclare(deadLetterQueueName, true, false, false, queueArgs).onItem().invoke(() -> RabbitMQLogging.log.queueEstablished(deadLetterQueueName)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishQueue(deadLetterQueueName, (Throwable)ex)).onItem().call(v -> client.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey)).onItem().invoke(() -> RabbitMQLogging.log.deadLetterBindingEstablished(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishBinding(deadLetterQueueName, deadLetterExchangeName, (Throwable)ex)).onItem().transform(v -> deadLetterQueueName));
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        RabbitMQConnectorOutgoingConfiguration oc = new RabbitMQConnectorOutgoingConfiguration(config);
        this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.INITIALISING);
        RabbitMQClient client = RabbitMQClientHelper.createClient(this, oc, this.clientOptions, this.credentialsProviders);
        client.getDelegate().addConnectionEstablishedCallback(promise -> this.establishExchange(client, oc).subscribe().with(ignored -> promise.complete(), arg_0 -> ((Promise)promise).fail(arg_0)));
        ConnectionHolder holder = new ConnectionHolder(client, oc, this.getVertx());
        Uni getSender = holder.getOrEstablishConnection().onItem().transformToUni(connection -> Uni.createFrom().item((Object)RabbitMQPublisher.create((Vertx)this.getVertx(), (RabbitMQClient)connection, (RabbitMQPublisherOptions)new RabbitMQPublisherOptions().setReconnectAttempts(oc.getReconnectAttempts()).setReconnectInterval(Duration.ofSeconds(oc.getReconnectInterval().intValue()).toMillis()).setMaxInternalQueueSize(oc.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE).intValue())))).onItem().call(RabbitMQPublisher::start).invoke(s -> this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.CONNECTED)).onFailure().invoke(t -> this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED)).onFailure().recoverWithNull().memoize().indefinitely().onCancellation().invoke(() -> this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED));
        RabbitMQMessageSender processor = new RabbitMQMessageSender(oc, (Uni<RabbitMQPublisher>)getSender);
        this.subscriptions.put(oc.getChannel(), processor);
        return MultiUtils.via((Flow.Processor)processor, m -> m.onFailure().invoke(t -> {
            RabbitMQLogging.log.error(oc.getChannel(), (Throwable)t);
            this.outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED);
        }));
    }

    public HealthReport getReadiness() {
        return this.getHealth(false);
    }

    public HealthReport getLiveness() {
        return this.getHealth(false);
    }

    public HealthReport getHealth(boolean strict) {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        this.incomingChannelStatus.forEach((channel, status) -> builder.add(channel, status == ChannelStatus.CONNECTED));
        this.outgoingChannelStatus.forEach((channel, status) -> builder.add(channel, strict ? status == ChannelStatus.CONNECTED : status != ChannelStatus.NOT_CONNECTED));
        return builder.build();
    }

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=50) @BeforeDestroyed(value=ApplicationScoped.class) Object ignored) {
        this.subscriptions.forEach((channel, subscription) -> subscription.cancel());
        this.clients.forEach((channel, rabbitMQClient) -> rabbitMQClient.stopAndAwait());
        this.clients.clear();
    }

    public Vertx getVertx() {
        return this.executionHolder.vertx();
    }

    public void addClient(String channel, RabbitMQClient client) {
        this.clients.put(channel, client);
    }

    private Uni<String> establishExchange(RabbitMQClient client, RabbitMQConnectorCommonConfiguration config) {
        boolean declareExchange;
        String exchangeName = RabbitMQConnector.getExchangeName(config);
        boolean bl = declareExchange = Boolean.TRUE.equals(config.getExchangeDeclare()) && exchangeName.length() != 0;
        if (declareExchange) {
            return client.exchangeDeclare(exchangeName, config.getExchangeType(), config.getExchangeDurable().booleanValue(), config.getExchangeAutoDelete().booleanValue()).onItem().invoke(() -> RabbitMQLogging.log.exchangeEstablished(exchangeName)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishExchange(exchangeName, (Throwable)ex)).onItem().transform(v -> exchangeName);
        }
        return Uni.createFrom().item((Object)exchangeName);
    }

    private Uni<String> establishQueue(RabbitMQClient client, RabbitMQConnectorIncomingConfiguration ic) {
        String queueName = ic.getQueueName();
        JsonObject queueArgs = new JsonObject();
        if (ic.getAutoBindDlq().booleanValue()) {
            queueArgs.put("x-dead-letter-exchange", (Object)ic.getDeadLetterExchange());
            queueArgs.put("x-dead-letter-routing-key", (Object)ic.getDeadLetterRoutingKey().orElse(queueName));
        }
        ic.getQueueSingleActiveConsumer().ifPresent(sac -> queueArgs.put("x-single-active-consumer", sac));
        ic.getQueueXQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType));
        ic.getQueueXQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode));
        ic.getQueueTtl().ifPresent(queueTtl -> {
            if (queueTtl < 0L) {
                throw RabbitMQExceptions.ex.illegalArgumentInvalidQueueTtl();
            }
            queueArgs.put("x-message-ttl", queueTtl);
        });
        ic.getQueueXMaxPriority().ifPresent(maxPriority -> queueArgs.put("x-max-priority", ic.getQueueXMaxPriority()));
        return this.establishExchange(client, ic).onItem().transform(v -> Boolean.TRUE.equals(ic.getQueueDeclare()) ? null : queueName).onItem().ifNotNull().call(name -> client.messageCount(name).onFailure().invoke(RabbitMQLogging.log::unableToConnectToBroker)).onItem().ifNull().switchTo(() -> {
            String serverQueueName = this.serverQueueName(queueName);
            Uni declare = serverQueueName.isEmpty() ? client.queueDeclare(serverQueueName, false, true, true) : client.queueDeclare(serverQueueName, ic.getQueueDurable().booleanValue(), ic.getQueueExclusive().booleanValue(), ic.getQueueAutoDelete().booleanValue(), queueArgs);
            return declare.onItem().invoke(() -> RabbitMQLogging.log.queueEstablished(queueName)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishQueue(queueName, (Throwable)ex)).onItem().transformToMulti(v -> this.establishBindings(client, ic)).onItem().ignoreAsUni().onItem().transform(v -> queueName);
        });
    }

    private Multi<String> establishBindings(RabbitMQClient client, RabbitMQConnectorIncomingConfiguration ic) {
        String exchangeName = RabbitMQConnector.getExchangeName(ic);
        String queueName = ic.getQueueName();
        List routingKeys = Arrays.stream(ic.getRoutingKeys().split(",")).map(String::trim).collect(Collectors.toList());
        Map<String, Object> arguments = this.parseArguments(ic.getArguments());
        if (exchangeName.isEmpty()) {
            return Multi.createFrom().empty();
        }
        return Multi.createFrom().iterable(routingKeys).onItem().call(routingKey -> client.queueBind(this.serverQueueName(queueName), exchangeName, routingKey, arguments)).onItem().invoke(routingKey -> RabbitMQLogging.log.bindingEstablished(queueName, exchangeName, (String)routingKey, arguments.toString())).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishBinding(queueName, exchangeName, (Throwable)ex));
    }

    private Map<String, Object> parseArguments(Optional<String> argumentsConfig) {
        HashMap<String, Object> argumentsBinding = new HashMap<String, Object>();
        argumentsConfig.ifPresent(args -> Arrays.stream(args.split(",")).map(String::trim).forEach(argumentKeyValue -> {
            String[] argumentKeyValueSplit = argumentKeyValue.split(":");
            if (argumentKeyValueSplit.length == 2) {
                String key = argumentKeyValueSplit[0];
                String value = argumentKeyValueSplit[1];
                argumentsBinding.put(key, value);
            }
        }));
        return argumentsBinding;
    }

    private RabbitMQFailureHandler createFailureHandler(RabbitMQConnectorIncomingConfiguration config) {
        String strategy = config.getFailureStrategy();
        RabbitMQFailureHandler.Strategy actualStrategy = RabbitMQFailureHandler.Strategy.from(strategy);
        switch (actualStrategy) {
            case FAIL: {
                return new RabbitMQFailStop(this, config.getChannel());
            }
            case ACCEPT: {
                return new RabbitMQAccept(config.getChannel());
            }
            case REJECT: {
                return new RabbitMQReject(config.getChannel());
            }
        }
        throw RabbitMQExceptions.ex.illegalArgumentInvalidFailureStrategy(strategy);
    }

    private RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration ic) {
        return Boolean.TRUE.equals(ic.getAutoAcknowledgement()) ? new RabbitMQAutoAck(ic.getChannel()) : new RabbitMQAck(ic.getChannel());
    }

    private String serverQueueName(String name) {
        if (name.equals("(server.auto)")) {
            return "";
        }
        return name;
    }

    public void reportIncomingFailure(String channel, Throwable reason) {
        RabbitMQClient client;
        RabbitMQLogging.log.failureReported(channel, reason);
        this.incomingChannelStatus.put(channel, ChannelStatus.NOT_CONNECTED);
        Flow.Subscription subscription = this.subscriptions.remove(channel);
        if (subscription != null) {
            subscription.cancel();
        }
        if ((client = this.clients.remove(channel)) != null) {
            client.stopAndForget();
        }
    }

    private static enum ChannelStatus {
        CONNECTED,
        NOT_CONNECTED,
        INITIALISING;

    }
}

