/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.amqp;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.amqp.AmqpClientHelper;
import io.smallrye.reactive.messaging.amqp.AmqpConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.amqp.AmqpConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.amqp.AmqpCreditBasedSender;
import io.smallrye.reactive.messaging.amqp.AmqpMessage;
import io.smallrye.reactive.messaging.amqp.ConnectionHolder;
import io.smallrye.reactive.messaging.amqp.fault.AmqpAccept;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailStop;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler;
import io.smallrye.reactive.messaging.amqp.fault.AmqpModifiedFailed;
import io.smallrye.reactive.messaging.amqp.fault.AmqpModifiedFailedAndUndeliverableHere;
import io.smallrye.reactive.messaging.amqp.fault.AmqpReject;
import io.smallrye.reactive.messaging.amqp.fault.AmqpRelease;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPExceptions;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.AmqpSenderOptions;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.amqp.AmqpReceiver;
import io.vertx.mutiny.amqp.AmqpSender;
import io.vertx.mutiny.core.Vertx;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.event.Reception;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;

@ApplicationScoped
@Connector(value="smallrye-amqp")
@ConnectorAttributes(value={@ConnectorAttribute(name="username", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The username used to authenticate to the broker", type="string", alias="amqp-username"), @ConnectorAttribute(name="password", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The password used to authenticate to the broker", type="string", alias="amqp-password"), @ConnectorAttribute(name="host", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker hostname", type="string", alias="amqp-host", defaultValue="localhost"), @ConnectorAttribute(name="port", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker port", type="int", alias="amqp-port", defaultValue="5672"), @ConnectorAttribute(name="use-ssl", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether the AMQP connection uses SSL/TLS", type="boolean", alias="amqp-use-ssl", defaultValue="false"), @ConnectorAttribute(name="virtual-host", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use)", type="string", alias="amqp-virtual-host"), @ConnectorAttribute(name="sni-server-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="If set, explicitly override the hostname to use for the TLS SNI server name", type="string", alias="amqp-sni-server-name"), @ConnectorAttribute(name="reconnect-attempts", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The number of reconnection attempts", type="int", alias="amqp-reconnect-attempts", defaultValue="100"), @ConnectorAttribute(name="reconnect-interval", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The interval in second between two reconnection attempts", type="int", alias="amqp-reconnect-interval", defaultValue="10"), @ConnectorAttribute(name="connect-timeout", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The connection timeout in milliseconds", type="int", alias="amqp-connect-timeout", defaultValue="1000"), @ConnectorAttribute(name="container-id", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The AMQP container id", type="string"), @ConnectorAttribute(name="address", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The AMQP address. If not set, the channel name is used", type="string"), @ConnectorAttribute(name="link-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of the link. If not set, the channel name is used.", type="string"), @ConnectorAttribute(name="client-options-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of the AMQP Client Option bean used to customize the AMQP client configuration", type="string", alias="amqp-client-options-name"), @ConnectorAttribute(name="tracing-enabled", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether tracing is enabled (default) or disabled", type="boolean", defaultValue="true"), @ConnectorAttribute(name="health-timeout", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed.", type="int", defaultValue="3"), @ConnectorAttribute(name="broadcast", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received AMQP messages must be dispatched to multiple _subscribers_", type="boolean", defaultValue="false"), @ConnectorAttribute(name="durable", direction=ConnectorAttribute.Direction.INCOMING, description="Whether AMQP subscription is durable", type="boolean", defaultValue="false"), @ConnectorAttribute(name="auto-acknowledgement", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received AMQP messages must be acknowledged when received", type="boolean", defaultValue="false"), @ConnectorAttribute(name="failure-strategy", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="Specify the failure strategy to apply when a message produced from an AMQP message is nacked. Accepted values are `fail` (default), `accept`, `release`, `reject`, `modified-failed`, `modified-failed-undeliverable-here`", defaultValue="fail"), @ConnectorAttribute(name="durable", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether sent AMQP messages are marked durable", type="boolean", defaultValue="false"), @ConnectorAttribute(name="ttl", direction=ConnectorAttribute.Direction.OUTGOING, description="The time-to-live of the send AMQP messages. 0 to disable the TTL", type="long", defaultValue="0"), @ConnectorAttribute(name="credit-retrieval-period", direction=ConnectorAttribute.Direction.OUTGOING, description="The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits.", type="int", defaultValue="2000"), @ConnectorAttribute(name="use-anonymous-sender", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether or not the connector should use an anonymous sender. Default value is `true` if the broker supports it, `false` otherwise. If not supported, it is not possible to dynamically change the destination address.", type="boolean"), @ConnectorAttribute(name="merge", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether the connector should allow multiple upstreams", type="boolean", defaultValue="false")})
public class AmqpConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory,
HealthReporter {
    static final String CONNECTOR_NAME = "smallrye-amqp";
    static Tracer TRACER;
    @Inject
    private ExecutionHolder executionHolder;
    @Inject
    @Any
    private Instance<AmqpClientOptions> clientOptions;
    private final List<AmqpClient> clients = new CopyOnWriteArrayList<AmqpClient>();
    private final Map<String, AmqpCreditBasedSender> processors = new ConcurrentHashMap<String, AmqpCreditBasedSender>();
    private final Map<String, Boolean> opened = new ConcurrentHashMap<String, Boolean>();
    private final Map<String, ConnectionHolder> holders = new ConcurrentHashMap<String, ConnectionHolder>();

    void setup(ExecutionHolder executionHolder) {
        this.executionHolder = executionHolder;
    }

    AmqpConnector() {
    }

    @PostConstruct
    void init() {
        TRACER = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging.amqp");
    }

    private Multi<? extends Message<?>> getStreamOfMessages(AmqpReceiver receiver, ConnectionHolder holder, String address, AmqpFailureHandler onNack, Boolean tracingEnabled) {
        AMQPLogging.log.receiverListeningAddress(address);
        BroadcastProcessor processor = BroadcastProcessor.create();
        receiver.exceptionHandler(t -> {
            AMQPLogging.log.receiverError((Throwable)t);
            processor.onError(t);
        });
        holder.onFailure(arg_0 -> ((BroadcastProcessor)processor).onError(arg_0));
        return Multi.createFrom().deferred(() -> {
            Multi stream = receiver.toMulti().map(m -> new AmqpMessage((io.vertx.mutiny.amqp.AmqpMessage)m, holder.getContext(), onNack, tracingEnabled));
            if (tracingEnabled.booleanValue()) {
                stream = stream.onItem().invoke(this::incomingTrace);
            }
            return Multi.createBy().merging().streams(new Publisher[]{stream, processor});
        });
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        AmqpConnectorIncomingConfiguration ic = new AmqpConnectorIncomingConfiguration(config);
        String address = ic.getAddress().orElseGet(ic::getChannel);
        this.opened.put(ic.getChannel(), false);
        boolean broadcast = ic.getBroadcast();
        boolean durable = ic.getDurable();
        boolean autoAck = ic.getAutoAcknowledgement();
        AmqpClient client = AmqpClientHelper.createClient(this, ic, this.clientOptions);
        String link = ic.getLinkName().orElseGet(ic::getChannel);
        ConnectionHolder holder = new ConnectionHolder(client, ic, this.getVertx());
        this.holders.put(ic.getChannel(), holder);
        AmqpFailureHandler onNack = this.createFailureHandler(ic);
        Multi multi = holder.getOrEstablishConnection().onItem().transformToUni(connection -> connection.createReceiver(address, new AmqpReceiverOptions().setAutoAcknowledgement(autoAck).setDurable(durable).setLinkName(link))).onItem().invoke(r -> this.opened.put(ic.getChannel(), true)).onItem().transformToMulti(r -> this.getStreamOfMessages((AmqpReceiver)r, holder, address, onNack, ic.getTracingEnabled()));
        Integer interval = ic.getReconnectInterval();
        Integer attempts = ic.getReconnectAttempts();
        multi = multi.onFailure().invoke(AMQPLogging.log::retrieveMessagesRetrying).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(interval.intValue())).atMost((long)attempts.intValue()).onFailure().invoke(t -> {
            this.opened.put(ic.getChannel(), false);
            AMQPLogging.log.retrieveMessagesNoMoreRetrying((Throwable)t);
        });
        if (broadcast) {
            multi = multi.broadcast().toAllSubscribers();
        }
        return ReactiveStreams.fromPublisher((Publisher)multi);
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        AmqpConnectorOutgoingConfiguration oc = new AmqpConnectorOutgoingConfiguration(config);
        String configuredAddress = oc.getAddress().orElseGet(oc::getChannel);
        this.opened.put(oc.getChannel(), false);
        AtomicReference sender = new AtomicReference();
        AmqpClient client = AmqpClientHelper.createClient(this, oc, this.clientOptions);
        String link = oc.getLinkName().orElseGet(oc::getChannel);
        ConnectionHolder holder = new ConnectionHolder(client, oc, this.getVertx());
        Uni getSender = Uni.createFrom().item((Object)((AmqpSender)sender.get())).onItem().ifNull().switchTo(() -> {
            AmqpSender current = (AmqpSender)sender.get();
            if (current != null && !current.connection().isDisconnected()) {
                return Uni.createFrom().item((Object)current);
            }
            return holder.getOrEstablishConnection().onItem().transformToUni(connection -> {
                boolean anonymous = oc.getUseAnonymousSender().orElseGet(() -> ConnectionHolder.supportAnonymousRelay(connection));
                if (anonymous) {
                    return connection.createAnonymousSender();
                }
                return connection.createSender(configuredAddress, new AmqpSenderOptions().setLinkName(link));
            }).onItem().invoke(s -> {
                sender.set(s);
                this.opened.put(oc.getChannel(), true);
            });
        }).onFailure().invoke(t -> {
            sender.set(null);
            this.opened.put(oc.getChannel(), false);
        }).onCancellation().invoke(() -> {
            sender.set(null);
            this.opened.put(oc.getChannel(), false);
        });
        AmqpCreditBasedSender processor = new AmqpCreditBasedSender(this, holder, oc, (Uni<AmqpSender>)getSender);
        this.processors.put(oc.getChannel(), processor);
        return ReactiveStreams.builder().via((Processor)processor).onError(t -> {
            AMQPLogging.log.failureReported(oc.getChannel(), (Throwable)t);
            this.opened.put(oc.getChannel(), false);
        }).ignore();
    }

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=50) @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        this.processors.values().forEach(AmqpCreditBasedSender::cancel);
        this.clients.forEach(AmqpClient::closeAndForget);
        this.clients.clear();
    }

    public Vertx getVertx() {
        return this.executionHolder.vertx();
    }

    public void addClient(AmqpClient client) {
        this.clients.add(client);
    }

    private AmqpFailureHandler createFailureHandler(AmqpConnectorIncomingConfiguration config) {
        String strategy = config.getFailureStrategy();
        AmqpFailureHandler.Strategy actualStrategy = AmqpFailureHandler.Strategy.from(strategy);
        switch (actualStrategy) {
            case FAIL: {
                return new AmqpFailStop(this, config.getChannel());
            }
            case ACCEPT: {
                return new AmqpAccept(config.getChannel());
            }
            case REJECT: {
                return new AmqpReject(config.getChannel());
            }
            case RELEASE: {
                return new AmqpRelease(config.getChannel());
            }
            case MODIFIED_FAILED: {
                return new AmqpModifiedFailed(config.getChannel());
            }
            case MODIFIED_FAILED_UNDELIVERABLE_HERE: {
                return new AmqpModifiedFailedAndUndeliverableHere(config.getChannel());
            }
        }
        throw AMQPExceptions.ex.illegalArgumentInvalidFailureStrategy(strategy);
    }

    public List<AmqpClient> getClients() {
        return this.clients;
    }

    public HealthReport getReadiness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (Map.Entry<String, ConnectionHolder> entry : this.holders.entrySet()) {
            try {
                builder.add(entry.getKey(), ((Boolean)entry.getValue().isConnected().await().atMost(Duration.ofSeconds(entry.getValue().getHealthTimeout()))).booleanValue());
            }
            catch (Exception e) {
                builder.add(entry.getKey(), false, e.getMessage());
            }
        }
        for (Map.Entry<String, Object> entry : this.processors.entrySet()) {
            try {
                builder.add(entry.getKey(), ((Boolean)((AmqpCreditBasedSender)entry.getValue()).isConnected().await().atMost(Duration.ofSeconds(((AmqpCreditBasedSender)entry.getValue()).getHealthTimeout()))).booleanValue());
            }
            catch (Exception e) {
                builder.add(entry.getKey(), false, e.getMessage());
            }
        }
        return builder.build();
    }

    public HealthReport getLiveness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (Map.Entry<String, Boolean> entry : this.opened.entrySet()) {
            builder.add(entry.getKey(), entry.getValue().booleanValue());
        }
        return builder.build();
    }

    public HealthReport getStartup() {
        return this.getLiveness();
    }

    public void reportFailure(String channel, Throwable reason) {
        AMQPLogging.log.failureReported(channel, reason);
        this.opened.put(channel, false);
        this.terminate(null);
    }

    private void incomingTrace(AmqpMessage<?> message) {
        TracingMetadata tracingMetadata = TracingMetadata.fromMessage(message).orElse(TracingMetadata.empty());
        SpanBuilder spanBuilder = TRACER.spanBuilder(message.getAddress() + " receive").setSpanKind(SpanKind.CONSUMER);
        Context parentSpanContext = tracingMetadata.getPreviousContext();
        if (parentSpanContext != null) {
            spanBuilder.setParent(parentSpanContext);
        } else {
            spanBuilder.setNoParent();
        }
        Span span = spanBuilder.startSpan();
        span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, (Object)"AMQP 1.0");
        span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, (Object)message.getAddress());
        span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, (Object)"queue");
        span.makeCurrent();
        message.injectTracingMetadata(tracingMetadata.withSpan(span));
        span.end();
    }
}

