/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.rabbitmq;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQMessageConverter;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.vertx.mutiny.rabbitmq.RabbitMQPublisher;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class RabbitMQMessageSender
implements Processor<Message<?>, Message<?>>,
Subscription {
    private final Uni<RabbitMQPublisher> retrieveSender;
    private final RabbitMQConnectorOutgoingConfiguration configuration;
    private final AtomicReference<Subscription> upstream = new AtomicReference();
    private final AtomicReference<Subscriber<? super Message<?>>> downstream = new AtomicReference();
    private final String configuredExchange;
    private final boolean isTracingEnabled;
    private final long inflights;
    private final Optional<Long> defaultTtl;

    public RabbitMQMessageSender(RabbitMQConnectorOutgoingConfiguration oc, Uni<RabbitMQPublisher> retrieveSender) {
        this.retrieveSender = retrieveSender;
        this.configuration = oc;
        this.configuredExchange = RabbitMQConnector.getExchangeName(oc);
        this.isTracingEnabled = oc.getTracingEnabled();
        this.inflights = oc.getMaxInflightMessages();
        this.defaultTtl = oc.getDefaultTtl();
        if (this.inflights <= 0L) {
            throw RabbitMQExceptions.ex.illegalArgumentInvalidMaxInflightMessages();
        }
        if (this.defaultTtl.isPresent() && this.defaultTtl.get() < 0L) {
            throw RabbitMQExceptions.ex.illegalArgumentInvalidDefaultTtl();
        }
    }

    public void subscribe(Subscriber<? super Message<?>> subscriber) {
        if (!this.downstream.compareAndSet(null, subscriber)) {
            Subscriptions.fail(subscriber, (Throwable)RabbitMQExceptions.ex.illegalStateOnlyOneSubscriberAllowed());
        } else if (this.upstream.get() != null) {
            subscriber.onSubscribe((Subscription)this);
        }
    }

    public void onSubscribe(Subscription subscription) {
        if (this.upstream.compareAndSet(null, subscription)) {
            Subscriber<? super Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe((Subscription)this);
            }
        } else {
            Subscriber<? super Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe((Subscription)Subscriptions.CANCELLED);
            }
        }
    }

    public void onNext(Message<?> message) {
        if (this.isCancelled()) {
            return;
        }
        Subscriber<? super Message<?>> subscriber = this.downstream.get();
        this.retrieveSender.onItem().transformToUni(sender -> {
            try {
                return this.send((RabbitMQPublisher)sender, message, this.configuredExchange, this.configuration).onItem().transform(m -> Tuple2.of((Object)sender, (Object)m));
            }
            catch (Exception e) {
                message.nack((Throwable)e);
                RabbitMQLogging.log.serializationFailure(this.configuration.getChannel(), e);
                return Uni.createFrom().nullItem();
            }
        }).subscribe().with(tuple -> {
            if (tuple != null) {
                subscriber.onNext(tuple.getItem2());
                if (this.inflights != Long.MAX_VALUE) {
                    this.upstream.get().request(1L);
                }
            }
        }, arg_0 -> subscriber.onError(arg_0));
    }

    public void onError(Throwable t) {
        Subscription sub = this.upstream.getAndSet((Subscription)Subscriptions.CANCELLED);
        Subscriber<? super Message<?>> subscriber = this.downstream.get();
        if (sub != null && sub != Subscriptions.CANCELLED && subscriber != null) {
            subscriber.onError(t);
        }
    }

    public void onComplete() {
        Subscription sub = this.upstream.getAndSet((Subscription)Subscriptions.CANCELLED);
        Subscriber<? super Message<?>> subscriber = this.downstream.get();
        if (sub != null && sub != Subscriptions.CANCELLED && subscriber != null) {
            subscriber.onComplete();
        }
    }

    public void request(long l) {
        if (l != Long.MAX_VALUE) {
            throw RabbitMQExceptions.ex.illegalStateConsumeWithoutBackPressure();
        }
        this.upstream.get().request(this.inflights);
    }

    public void cancel() {
        Subscription sub = this.upstream.getAndSet((Subscription)Subscriptions.CANCELLED);
        if (sub != null && sub != Subscriptions.CANCELLED) {
            sub.cancel();
        }
    }

    private Uni<Message<?>> send(RabbitMQPublisher publisher, Message<?> msg, String exchange, RabbitMQConnectorOutgoingConfiguration configuration) {
        int retryAttempts = configuration.getReconnectAttempts();
        int retryInterval = configuration.getReconnectInterval();
        String defaultRoutingKey = configuration.getDefaultRoutingKey();
        RabbitMQMessageConverter.OutgoingRabbitMQMessage outgoingRabbitMQMessage = RabbitMQMessageConverter.convert(msg, exchange, defaultRoutingKey, this.defaultTtl, this.isTracingEnabled, Arrays.stream(configuration.getTracingAttributeHeaders().split(",")).map(String::trim).collect(Collectors.toList()));
        RabbitMQLogging.log.sendingMessageToExchange(exchange, outgoingRabbitMQMessage.getRoutingKey());
        return publisher.publish(exchange, outgoingRabbitMQMessage.getRoutingKey(), outgoingRabbitMQMessage.getProperties(), outgoingRabbitMQMessage.getBody()).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(retryInterval)).atMost((long)retryAttempts).onItemOrFailure().transformToUni((success, failure) -> {
            if (failure != null) {
                return Uni.createFrom().completionStage(msg.nack(failure));
            }
            return Uni.createFrom().completionStage(msg.ack());
        }).onItem().transform(x -> msg);
    }

    private boolean isCancelled() {
        Subscription subscription = this.upstream.get();
        return subscription == Subscriptions.CANCELLED || subscription == null;
    }
}

