/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.rabbitmq.internals;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig;
import io.smallrye.reactive.messaging.rabbitmq.ClientHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace;
import io.vertx.core.Context;
import io.vertx.core.Promise;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
import io.vertx.mutiny.rabbitmq.RabbitMQMessage;
import io.vertx.rabbitmq.QueueOptions;
import jakarta.enterprise.inject.Instance;
import java.util.Map;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;

public class IncomingRabbitMQChannel {
    private final RabbitMQOpenTelemetryInstrumenter instrumenter;
    private final AtomicReference<Flow.Subscription> subscription = new AtomicReference();
    private volatile RabbitMQClient client;
    private final RabbitMQConnectorIncomingConfiguration config;
    private final Multi<? extends Message<?>> stream;
    private final RabbitMQConnector connector;

    public IncomingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorIncomingConfiguration ic, Instance<OpenTelemetry> openTelemetryInstance) {
        this.instrumenter = ic.getTracingEnabled() != false ? RabbitMQOpenTelemetryInstrumenter.createForConnector(openTelemetryInstance) : null;
        this.config = ic;
        this.connector = connector;
        RabbitMQFailureHandler onNack = this.createFailureHandler(connector.failureHandlerFactories(), ic);
        RabbitMQAckHandler onAck = this.createAckHandler(ic);
        Multi multi = this.createConsumer(connector, ic).invoke(tuple -> {
            this.client = ((ClientHolder)tuple.getItem1()).client();
        }).onItem().transformToMulti(tuple -> this.getStreamOfMessages((RabbitMQConsumer)tuple.getItem2(), (ClientHolder)tuple.getItem1(), ic, onNack, onAck));
        if (ic.getBroadcast().booleanValue()) {
            multi = multi.broadcast().toAllSubscribers();
        }
        this.stream = multi.onSubscription().invoke(this.subscription::set);
    }

    public Multi<? extends Message<?>> getStream() {
        return this.stream;
    }

    public HealthReport.HealthReportBuilder isAlive(HealthReport.HealthReportBuilder builder) {
        if (!this.config.getHealthEnabled().booleanValue()) {
            return builder;
        }
        return this.computeHealthReport(builder);
    }

    private HealthReport.HealthReportBuilder computeHealthReport(HealthReport.HealthReportBuilder builder) {
        if (this.config.getHealthLazySubscription().booleanValue() && this.subscription.get() == null) {
            return builder.add(new HealthReport.ChannelInfo(this.config.getChannel(), true));
        }
        if (this.client == null) {
            return builder.add(new HealthReport.ChannelInfo(this.config.getChannel(), false));
        }
        boolean alive = this.client.isConnected() && this.client.isOpenChannel();
        return builder.add(new HealthReport.ChannelInfo(this.config.getChannel(), alive));
    }

    public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder builder) {
        if (!this.config.getHealthEnabled().booleanValue() || !this.config.getHealthReadinessEnabled().booleanValue()) {
            return builder;
        }
        return this.computeHealthReport(builder);
    }

    private Uni<Tuple2<ClientHolder, RabbitMQConsumer>> createConsumer(RabbitMQConnector connector, RabbitMQConnectorIncomingConfiguration ic) {
        RabbitMQClient client = RabbitMQClientHelper.createClient(connector, ic);
        client.getDelegate().addConnectionEstablishedCallback(promise -> {
            Uni uni = ic.getMaxOutstandingMessages().isPresent() ? client.basicQos(ic.getMaxOutstandingMessages().get().intValue(), false) : Uni.createFrom().nullItem();
            Instance<Map<String, ?>> maps = connector.configMaps();
            uni.call(() -> this.declareQueue(client, ic, maps)).call(() -> RabbitMQClientHelper.configureDLQorDLX(client, ic, maps)).subscribe().with(ignored -> promise.complete(), arg_0 -> ((Promise)promise).fail(arg_0));
        });
        io.vertx.mutiny.core.Context root = null;
        if (ConcurrencyConnectorConfig.getConcurrency((Config)ic.config()).filter(i -> i > 1).isPresent()) {
            root = io.vertx.mutiny.core.Context.newInstance((Context)((VertxInternal)connector.vertx().getDelegate()).createEventLoopContext());
        }
        ClientHolder holder = new ClientHolder(client, ic, connector.vertx(), root);
        return holder.getOrEstablishConnection().invoke(() -> RabbitMQLogging.log.connectionEstablished(ic.getChannel())).flatMap(connection -> this.createConsumer(ic, (RabbitMQClient)connection).map(consumer -> Tuple2.of((Object)holder, (Object)consumer)));
    }

    private RabbitMQFailureHandler createFailureHandler(Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories, RabbitMQConnectorIncomingConfiguration config) {
        String strategy = config.getFailureStrategy();
        Instance failureHandlerFactory = CDIUtils.getInstanceById(failureHandlerFactories, (String)strategy);
        if (failureHandlerFactory.isResolvable()) {
            return ((RabbitMQFailureHandler.Factory)failureHandlerFactory.get()).create(config, this.connector);
        }
        throw RabbitMQExceptions.ex.illegalArgumentInvalidFailureStrategy(strategy);
    }

    public RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration ic) {
        return Boolean.TRUE.equals(ic.getAutoAcknowledgement()) ? new RabbitMQAutoAck(ic.getChannel()) : new RabbitMQAck(ic.getChannel());
    }

    private Uni<String> declareQueue(RabbitMQClient client, RabbitMQConnectorIncomingConfiguration ic, Instance<Map<String, ?>> configMaps) {
        String queueName = RabbitMQClientHelper.getQueueName(ic);
        JsonObject queueArgs = new JsonObject();
        Instance queueArguments = CDIUtils.getInstanceById(configMaps, (String)ic.getQueueArguments());
        if (queueArguments.isResolvable()) {
            Map argsMap = (Map)queueArguments.get();
            argsMap.forEach((arg_0, arg_1) -> ((JsonObject)queueArgs).put(arg_0, arg_1));
        }
        if (ic.getAutoBindDlq().booleanValue()) {
            queueArgs.put("x-dead-letter-exchange", (Object)ic.getDeadLetterExchange());
            queueArgs.put("x-dead-letter-routing-key", (Object)ic.getDeadLetterRoutingKey().orElse(queueName));
        }
        ic.getQueueSingleActiveConsumer().ifPresent(sac -> queueArgs.put("x-single-active-consumer", sac));
        ic.getQueueXQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType));
        ic.getQueueXQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode));
        ic.getQueueTtl().ifPresent(queueTtl -> {
            if (queueTtl < 0L) {
                throw RabbitMQExceptions.ex.illegalArgumentInvalidQueueTtl();
            }
            queueArgs.put("x-message-ttl", queueTtl);
        });
        ic.getQueueXMaxPriority().ifPresent(maxPriority -> queueArgs.put("x-max-priority", maxPriority));
        ic.getQueueXDeliveryLimit().ifPresent(deliveryLimit -> queueArgs.put("x-delivery-limit", deliveryLimit));
        return RabbitMQClientHelper.declareExchangeIfNeeded(client, ic, configMaps).flatMap(v -> {
            if (ic.getQueueDeclare().booleanValue()) {
                String serverQueueName = RabbitMQClientHelper.serverQueueName(queueName);
                Uni declare = serverQueueName.isEmpty() ? client.queueDeclare(serverQueueName, false, true, true) : client.queueDeclare(serverQueueName, ic.getQueueDurable().booleanValue(), ic.getQueueExclusive().booleanValue(), ic.getQueueAutoDelete().booleanValue(), queueArgs);
                return declare.invoke(() -> RabbitMQLogging.log.queueEstablished(queueName)).onFailure().invoke(ex -> RabbitMQLogging.log.unableToEstablishQueue(queueName, (Throwable)ex)).flatMap(x -> RabbitMQClientHelper.establishBindings(client, ic)).replaceWith((Object)queueName);
            }
            return client.messageCount(queueName).onFailure().invoke(RabbitMQLogging.log::unableToConnectToBroker).replaceWith((Object)queueName);
        });
    }

    private Uni<RabbitMQConsumer> createConsumer(RabbitMQConnectorIncomingConfiguration ic, RabbitMQClient client) {
        QueueOptions queueOptions = new QueueOptions();
        queueOptions.setConsumerArguments(RabbitMQClientHelper.parseArguments(ic.getConsumerArguments()));
        return client.basicConsumer(RabbitMQClientHelper.serverQueueName(RabbitMQClientHelper.getQueueName(ic)), queueOptions.setAutoAck(ic.getAutoAcknowledgement().booleanValue()).setMaxInternalQueueSize(ic.getMaxIncomingInternalQueueSize().intValue()).setKeepMostRecent(ic.getKeepMostRecent().booleanValue()));
    }

    private Multi<? extends Message<?>> getStreamOfMessages(RabbitMQConsumer receiver, ClientHolder holder, RabbitMQConnectorIncomingConfiguration ic, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck) {
        String queueName = RabbitMQClientHelper.getQueueName(ic);
        boolean isTracingEnabled = ic.getTracingEnabled();
        String contentTypeOverride = ic.getContentTypeOverride().orElse(null);
        RabbitMQLogging.log.receiverListeningAddress(queueName);
        if (isTracingEnabled) {
            return receiver.toMulti().emitOn(c -> VertxContext.runOnContext((Context)holder.getContext().getDelegate(), (Runnable)c)).map(m -> new IncomingRabbitMQMessage((RabbitMQMessage)m, holder, onNack, onAck, contentTypeOverride)).map(msg -> this.instrumenter.traceIncoming((Message<?>)msg, RabbitMQTrace.traceQueue(queueName, msg.message.envelope().getRoutingKey(), msg.getHeaders())));
        }
        return receiver.toMulti().emitOn(c -> VertxContext.runOnContext((Context)holder.getContext().getDelegate(), (Runnable)c)).map(m -> new IncomingRabbitMQMessage((RabbitMQMessage)m, holder, onNack, onAck, contentTypeOverride));
    }

    public void terminate() {
        Flow.Subscription sub = this.subscription.getAndSet(null);
        if (sub != null) {
            sub.cancel();
        }
    }
}

