/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.amqp;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.amqp.AmqpClientHelper;
import io.smallrye.reactive.messaging.amqp.AmqpConnectorCommonConfiguration;
import io.smallrye.reactive.messaging.amqp.AmqpConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.amqp.AmqpConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.amqp.IncomingAmqpChannel;
import io.smallrye.reactive.messaging.amqp.OutgoingAmqpChannel;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.core.Vertx;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(value="smallrye-amqp")
@ConnectorAttributes(value={@ConnectorAttribute(name="username", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The username used to authenticate to the broker", type="string", alias="amqp-username"), @ConnectorAttribute(name="password", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The password used to authenticate to the broker", type="string", alias="amqp-password"), @ConnectorAttribute(name="host", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker hostname", type="string", alias="amqp-host", defaultValue="localhost"), @ConnectorAttribute(name="port", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker port", type="int", alias="amqp-port", defaultValue="5672"), @ConnectorAttribute(name="use-ssl", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether the AMQP connection uses SSL/TLS", type="boolean", alias="amqp-use-ssl", defaultValue="false"), @ConnectorAttribute(name="virtual-host", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use)", type="string", alias="amqp-virtual-host"), @ConnectorAttribute(name="sni-server-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="If set, explicitly override the hostname to use for the TLS SNI server name", type="string", alias="amqp-sni-server-name"), @ConnectorAttribute(name="reconnect-attempts", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The number of reconnection attempts", type="int", alias="amqp-reconnect-attempts", defaultValue="100"), @ConnectorAttribute(name="reconnect-interval", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The interval in second between two reconnection attempts", type="int", alias="amqp-reconnect-interval", defaultValue="10"), @ConnectorAttribute(name="connect-timeout", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The connection timeout in milliseconds", type="int", alias="amqp-connect-timeout", defaultValue="1000"), @ConnectorAttribute(name="container-id", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The AMQP container id", type="string"), @ConnectorAttribute(name="address", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The AMQP address. If not set, the channel name is used", type="string"), @ConnectorAttribute(name="link-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of the link. If not set, the channel name is used.", type="string"), @ConnectorAttribute(name="client-options-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of the AMQP Client Option bean used to customize the AMQP client configuration", type="string", alias="amqp-client-options-name"), @ConnectorAttribute(name="client-ssl-context-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of an SSLContext bean to use for connecting to AMQP when SSL is used", type="string", alias="amqp-client-ssl-context-name", hiddenFromDocumentation=true), @ConnectorAttribute(name="tracing-enabled", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether tracing is enabled (default) or disabled", type="boolean", defaultValue="true"), @ConnectorAttribute(name="health-enabled", type="boolean", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether health reporting is enabled (default) or disabled", defaultValue="true"), @ConnectorAttribute(name="health-timeout", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed.", type="int", defaultValue="3"), @ConnectorAttribute(name="cloud-events", type="boolean", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Enables (default) or disables the Cloud Event support. If enabled on an _incoming_ channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an _outgoing_, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.", defaultValue="true"), @ConnectorAttribute(name="capabilities", type="string", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description=" A comma-separated list of capabilities proposed by the sender or receiver client."), @ConnectorAttribute(name="retry-on-fail-attempts", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The number of tentative to retry on failure", type="int", defaultValue="6"), @ConnectorAttribute(name="retry-on-fail-interval", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The interval (in seconds) between two sending attempts", type="int", defaultValue="5"), @ConnectorAttribute(name="broadcast", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received AMQP messages must be dispatched to multiple _subscribers_", type="boolean", defaultValue="false"), @ConnectorAttribute(name="durable", direction=ConnectorAttribute.Direction.INCOMING, description="Whether AMQP subscription is durable", type="boolean", defaultValue="false"), @ConnectorAttribute(name="auto-acknowledgement", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received AMQP messages must be acknowledged when received", type="boolean", defaultValue="false"), @ConnectorAttribute(name="failure-strategy", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="Specify the failure strategy to apply when a message produced from an AMQP message is nacked. Accepted values are `fail` (default), `accept`, `release`, `reject`, `modified-failed`, `modified-failed-undeliverable-here`", defaultValue="fail"), @ConnectorAttribute(name="selector", direction=ConnectorAttribute.Direction.INCOMING, description="Sets a message selector. This attribute is used to define an `apache.org:selector-filter:string` filter on the source terminus, using SQL-based syntax to request the server filters which messages are delivered to the receiver (if supported by the server in question). Precise functionality supported and syntax needed can vary depending on the server.", type="string"), @ConnectorAttribute(name="durable", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether sent AMQP messages are marked durable", type="boolean", defaultValue="false"), @ConnectorAttribute(name="ttl", direction=ConnectorAttribute.Direction.OUTGOING, description="The time-to-live of the send AMQP messages. 0 to disable the TTL", type="long", defaultValue="0"), @ConnectorAttribute(name="credit-retrieval-period", direction=ConnectorAttribute.Direction.OUTGOING, description="The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits.", type="int", defaultValue="2000"), @ConnectorAttribute(name="max-inflight-messages", type="long", direction=ConnectorAttribute.Direction.OUTGOING, description="The maximum number of messages to be written to the broker concurrently. The number of sent messages waiting to be acknowledged by the broker are limited by this value and credits granted by the broker. The default value `0` means only credits apply.", defaultValue="0"), @ConnectorAttribute(name="use-anonymous-sender", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether or not the connector should use an anonymous sender. Default value is `true` if the broker supports it, `false` otherwise. If not supported, it is not possible to dynamically change the destination address.", type="boolean"), @ConnectorAttribute(name="merge", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether the connector should allow multiple upstreams", type="boolean", defaultValue="false"), @ConnectorAttribute(name="cloud-events-source", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Configure the default `source` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `source` attribute itself", alias="cloud-events-default-source"), @ConnectorAttribute(name="cloud-events-type", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Configure the default `type` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `type` attribute itself", alias="cloud-events-default-type"), @ConnectorAttribute(name="cloud-events-subject", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Configure the default `subject` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `subject` attribute itself", alias="cloud-events-default-subject"), @ConnectorAttribute(name="cloud-events-data-content-type", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Configure the default `datacontenttype` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `datacontenttype` attribute itself", alias="cloud-events-default-data-content-type"), @ConnectorAttribute(name="cloud-events-data-schema", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Configure the default `dataschema` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `dataschema` attribute itself", alias="cloud-events-default-data-schema"), @ConnectorAttribute(name="cloud-events-insert-timestamp", type="boolean", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether or not the connector should insert automatically the `time` attribute into the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `time` attribute itself", alias="cloud-events-default-timestamp", defaultValue="true"), @ConnectorAttribute(name="cloud-events-mode", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="The Cloud Event mode (`structured` or `binary` (default)). Indicates how are written the cloud events in the outgoing record", defaultValue="binary"), @ConnectorAttribute(name="lazy-client", type="boolean", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether to create the connection and sender at startup or at first send request", defaultValue="true")})
public class AmqpConnector
implements InboundConnector,
OutboundConnector,
HealthReporter {
    public static final String CONNECTOR_NAME = "smallrye-amqp";
    @Inject
    private ExecutionHolder executionHolder;
    @Inject
    @Any
    private Instance<AmqpClientOptions> clientOptions;
    @Inject
    @Any
    private Instance<ClientCustomizer<AmqpClientOptions>> configCustomizers;
    @Inject
    private Instance<OpenTelemetry> openTelemetryInstance;
    private final List<AmqpClient> clients = new CopyOnWriteArrayList<AmqpClient>();
    private final Map<String, IncomingAmqpChannel> incomingChannels = new ConcurrentHashMap<String, IncomingAmqpChannel>();
    private final Map<String, OutgoingAmqpChannel> outgoingChannels = new ConcurrentHashMap<String, OutgoingAmqpChannel>();

    void setup(ExecutionHolder executionHolder) {
        this.executionHolder = executionHolder;
    }

    AmqpConnector() {
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        AmqpConnectorIncomingConfiguration ic = new AmqpConnectorIncomingConfiguration(config);
        AmqpClient client = AmqpClientHelper.createClient(this.executionHolder.vertx(), (AmqpConnectorCommonConfiguration)ic, this.clientOptions, this.configCustomizers);
        this.addClient(client);
        IncomingAmqpChannel incoming = new IncomingAmqpChannel(ic, client, this.executionHolder.vertx(), this.openTelemetryInstance, this::reportFailure);
        this.incomingChannels.put(ic.getChannel(), incoming);
        return incoming.getPublisher();
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        AmqpConnectorOutgoingConfiguration oc = new AmqpConnectorOutgoingConfiguration(config);
        AmqpClient client = AmqpClientHelper.createClient(this.executionHolder.vertx(), (AmqpConnectorCommonConfiguration)oc, this.clientOptions, this.configCustomizers);
        this.addClient(client);
        OutgoingAmqpChannel outgoing = new OutgoingAmqpChannel(oc, client, this.executionHolder.vertx(), this.openTelemetryInstance, this::reportFailure);
        this.outgoingChannels.put(oc.getChannel(), outgoing);
        return outgoing.getSubscriber();
    }

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=50) @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        this.outgoingChannels.values().forEach(OutgoingAmqpChannel::close);
        this.incomingChannels.values().forEach(IncomingAmqpChannel::close);
        this.clients.forEach(c -> c.close().subscribeAsCompletionStage());
        this.clients.clear();
    }

    public Vertx getVertx() {
        return this.executionHolder.vertx();
    }

    public void addClient(AmqpClient client) {
        this.clients.add(client);
    }

    public List<AmqpClient> getClients() {
        return this.clients;
    }

    public HealthReport getReadiness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (Map.Entry<String, IncomingAmqpChannel> entry : this.incomingChannels.entrySet()) {
            if (!entry.getValue().isHealthEnabled()) continue;
            try {
                builder.add(entry.getKey(), ((Boolean)entry.getValue().isConnected().await().atMost(Duration.ofSeconds(entry.getValue().getHealthTimeout()))).booleanValue());
            }
            catch (Exception e) {
                builder.add(entry.getKey(), false, e.getMessage());
            }
        }
        for (Map.Entry<String, Object> entry : this.outgoingChannels.entrySet()) {
            if (!((OutgoingAmqpChannel)entry.getValue()).isHealthEnabled()) continue;
            try {
                builder.add(entry.getKey(), ((Boolean)((OutgoingAmqpChannel)entry.getValue()).isConnected().await().atMost(Duration.ofSeconds(((OutgoingAmqpChannel)entry.getValue()).getHealthTimeout()))).booleanValue());
            }
            catch (Exception e) {
                builder.add(entry.getKey(), false, e.getMessage());
            }
        }
        return builder.build();
    }

    public HealthReport getLiveness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (Map.Entry<String, IncomingAmqpChannel> entry : this.incomingChannels.entrySet()) {
            if (!entry.getValue().isHealthEnabled()) continue;
            builder.add(entry.getKey(), entry.getValue().isOpen());
        }
        for (Map.Entry<String, Object> entry : this.outgoingChannels.entrySet()) {
            if (!((OutgoingAmqpChannel)entry.getValue()).isHealthEnabled()) continue;
            builder.add(entry.getKey(), ((OutgoingAmqpChannel)entry.getValue()).isOpen());
        }
        return builder.build();
    }

    public HealthReport getStartup() {
        return this.getLiveness();
    }

    public void reportFailure(String channel, Throwable reason) {
        AMQPLogging.log.failureReported(channel, reason);
        this.terminate(null);
    }
}

