/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.rabbitmq.internals;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.rabbitmq.ClientHolder;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper;
import io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQMessageSender;
import io.vertx.core.Promise;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import jakarta.enterprise.inject.Instance;
import java.time.Duration;
import java.util.concurrent.Flow;
import org.eclipse.microprofile.reactive.messaging.Message;

public class OutgoingRabbitMQChannel {
    private final Flow.Subscriber<Message<?>> subscriber;
    private final RabbitMQConnectorOutgoingConfiguration config;
    private final ClientHolder holder;
    private volatile RabbitMQPublisher publisher;

    public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc, Instance<OpenTelemetry> openTelemetryInstance) {
        this.config = oc;
        RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc);
        client.getDelegate().addConnectionEstablishedCallback(promise -> RabbitMQClientHelper.declareExchangeIfNeeded(client, oc, connector.configMaps()).subscribe().with(ignored -> promise.complete(), arg_0 -> ((Promise)promise).fail(arg_0)));
        this.holder = new ClientHolder(client, oc, connector.vertx(), null);
        Uni getSender = this.holder.getOrEstablishConnection().onItem().transformToUni(connection -> Uni.createFrom().item((Object)RabbitMQPublisher.create((Vertx)connector.vertx(), (RabbitMQClient)connection, (RabbitMQPublisherOptions)new RabbitMQPublisherOptions().setReconnectAttempts(oc.getReconnectAttempts()).setReconnectInterval(Duration.ofSeconds(oc.getReconnectInterval().intValue()).toMillis()).setMaxInternalQueueSize(oc.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE).intValue())))).onItem().call(RabbitMQPublisher::start).invoke(publisher -> {
            this.publisher = publisher;
        }).onFailure().recoverWithNull().memoize().indefinitely();
        RabbitMQMessageSender processor = new RabbitMQMessageSender(oc, (Uni<RabbitMQPublisher>)getSender, openTelemetryInstance);
        this.subscriber = MultiUtils.via((Flow.Processor)processor, m -> m.onFailure().invoke(t -> RabbitMQLogging.log.error(oc.getChannel(), (Throwable)t)));
    }

    public Flow.Subscriber<Message<?>> getSubscriber() {
        return this.subscriber;
    }

    public HealthReport.HealthReportBuilder isAlive(HealthReport.HealthReportBuilder builder) {
        if (!this.config.getHealthEnabled().booleanValue()) {
            return builder;
        }
        return this.computeHealthReport(builder);
    }

    private HealthReport.HealthReportBuilder computeHealthReport(HealthReport.HealthReportBuilder builder) {
        RabbitMQClient client = this.holder.client();
        if (client == null) {
            return builder.add(new HealthReport.ChannelInfo(this.config.getChannel(), false));
        }
        boolean ok = true;
        if (this.holder.hasBeenConnected()) {
            ok = client.isConnected() && client.isOpenChannel();
        }
        return builder.add(new HealthReport.ChannelInfo(this.config.getChannel(), ok));
    }

    public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder builder) {
        if (!this.config.getHealthEnabled().booleanValue() || !this.config.getHealthReadinessEnabled().booleanValue()) {
            return builder;
        }
        return this.computeHealthReport(builder);
    }

    public void terminate() {
        if (this.publisher != null) {
            try {
                this.publisher.stop().await().atMost(Duration.ofMillis(this.config.getConnectionTimeout().intValue()));
            }
            catch (Exception e) {
                RabbitMQLogging.log.infof(e, "Error terminating outgoing channel %s", this.config.getChannel());
            }
        }
    }
}

