/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.amqp;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.tuples.Tuple3;
import io.smallrye.reactive.messaging.amqp.AmqpConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.amqp.AmqpMessage;
import io.smallrye.reactive.messaging.amqp.AmqpMessageConverter;
import io.smallrye.reactive.messaging.amqp.ConnectionHolder;
import io.smallrye.reactive.messaging.amqp.OutgoingAmqpMetadata;
import io.smallrye.reactive.messaging.amqp.ce.AmqpCloudEventHelper;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPExceptions;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.vertx.amqp.impl.AmqpMessageImpl;
import io.vertx.mutiny.amqp.AmqpSender;
import jakarta.enterprise.inject.Instance;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;

public class AmqpCreditBasedSender
implements Flow.Processor<Message<?>, Message<?>>,
Flow.Subscription {
    private final ConnectionHolder holder;
    private final Uni<AmqpSender> retrieveSender;
    private final AmqpConnectorOutgoingConfiguration configuration;
    private final AtomicReference<Flow.Subscription> upstream = new AtomicReference();
    private final AtomicReference<Flow.Subscriber<? super Message<?>>> downstream = new AtomicReference();
    private final AtomicBoolean once = new AtomicBoolean();
    private final boolean durable;
    private final long ttl;
    private final String configuredAddress;
    private final boolean tracingEnabled;
    private final boolean mandatoryCloudEventAttributeSet;
    private final boolean writeCloudEvents;
    private final boolean writeAsBinaryCloudEvent;
    private final int retryAttempts;
    private final int retryInterval;
    private final boolean lazyClient;
    private final AmqpOpenTelemetryInstrumenter amqpInstrumenter;
    private volatile boolean isAnonymous;
    private volatile boolean creditRetrievalInProgress = false;
    private final long maxInflights;

    public AmqpCreditBasedSender(ConnectionHolder holder, AmqpConnectorOutgoingConfiguration configuration, Uni<AmqpSender> retrieveSender, Instance<OpenTelemetry> openTelemetryInstance) {
        this.holder = holder;
        this.retrieveSender = retrieveSender;
        this.configuration = configuration;
        this.durable = configuration.getDurable();
        this.ttl = configuration.getTtl();
        this.configuredAddress = configuration.getAddress().orElseGet(configuration::getChannel);
        this.tracingEnabled = configuration.getTracingEnabled();
        this.mandatoryCloudEventAttributeSet = configuration.getCloudEventsType().isPresent() && configuration.getCloudEventsSource().isPresent();
        this.writeCloudEvents = configuration.getCloudEvents();
        this.writeAsBinaryCloudEvent = configuration.getCloudEventsMode().equalsIgnoreCase("binary");
        this.retryAttempts = configuration.getRetryOnFailAttempts();
        this.retryInterval = configuration.getRetryOnFailInterval();
        this.maxInflights = configuration.getMaxInflightMessages();
        this.lazyClient = configuration.getLazyClient();
        this.amqpInstrumenter = this.tracingEnabled ? AmqpOpenTelemetryInstrumenter.createForSender(openTelemetryInstance) : null;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Message<?>> subscriber) {
        if (!this.downstream.compareAndSet(null, subscriber)) {
            Subscriptions.fail(subscriber, (Throwable)AMQPExceptions.ex.illegalStateOnlyOneSubscriberAllowed());
        } else if (this.upstream.get() != null) {
            subscriber.onSubscribe(this);
            if (!this.lazyClient) {
                this.request(1L);
            }
        }
    }

    private Uni<AmqpSender> getSenderAndCredits() {
        return this.retrieveSender.onItem().call(sender -> {
            this.isAnonymous = this.configuration.getUseAnonymousSender().orElseGet(() -> ConnectionHolder.supportAnonymousRelay(sender.connection()));
            CompletableFuture future = new CompletableFuture();
            this.holder.getContext().runOnContext(() -> {
                this.setCreditsAndRequest((AmqpSender)sender);
                future.complete(null);
            });
            return Uni.createFrom().completionStage(future);
        });
    }

    @CheckReturnValue
    public Uni<Boolean> isConnected() {
        return this.isConnected(true);
    }

    public int getHealthTimeout() {
        return this.configuration.getHealthTimeout();
    }

    private Uni<Boolean> isConnected(boolean attemptConnection) {
        return this.holder.isConnected().chain(ok -> {
            if (!ok.booleanValue() && attemptConnection) {
                return this.holder.getOrEstablishConnection().chain(x -> this.isConnected(false));
            }
            return Uni.createFrom().item(ok);
        });
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        if (this.upstream.compareAndSet(null, subscription)) {
            Flow.Subscriber<Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe(this);
            }
        } else {
            Flow.Subscriber<Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe((Flow.Subscription)Subscriptions.CANCELLED);
            }
        }
    }

    private long setCreditsAndRequest(AmqpSender sender) {
        long credits = sender.remainingCredits();
        Flow.Subscription subscription = this.upstream.get();
        if (credits > 0L && subscription != Subscriptions.CANCELLED) {
            long request = this.maxInflights > 0L ? Math.min(credits, this.maxInflights) : credits;
            AMQPLogging.log.retrievedCreditsForChannel(this.configuration.getChannel(), credits);
            subscription.request(request);
            return credits;
        }
        if (credits <= 0L && subscription != Subscriptions.CANCELLED) {
            this.onNoMoreCredit(sender);
        }
        return 0L;
    }

    void requestUpstream() {
        Flow.Subscription subscription = this.upstream.get();
        if (subscription != null && subscription != Subscriptions.CANCELLED) {
            subscription.request(1L);
        }
    }

    @Override
    public void onNext(Message<?> message) {
        if (this.isCancelled()) {
            return;
        }
        Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
        try {
            this.send(message, this.durable, this.ttl, this.configuredAddress, this.isAnonymous).subscribe().with(tuple -> {
                if (tuple != null) {
                    subscriber.onNext((Message<?>)tuple.getItem1());
                    long remainingCredits = (Long)tuple.getItem3();
                    if (remainingCredits <= 0L) {
                        this.onNoMoreCredit((AmqpSender)tuple.getItem2());
                    } else {
                        this.requestUpstream();
                    }
                } else {
                    this.requestUpstream();
                }
            }, subscriber::onError);
        }
        catch (Exception e) {
            message.nack((Throwable)e);
            AMQPLogging.log.serializationFailure(this.configuration.getChannel(), e);
            this.requestUpstream();
        }
    }

    private void onNoMoreCredit(AmqpSender sender) {
        if (!this.creditRetrievalInProgress) {
            this.creditRetrievalInProgress = true;
            AMQPLogging.log.noMoreCreditsForChannel(this.configuration.getChannel());
            this.holder.getContext().runOnContext(() -> {
                if (this.isCancelled()) {
                    return;
                }
                long c = this.setCreditsAndRequest(sender);
                if (c == 0L) {
                    this.holder.getVertx().setPeriodic((long)this.configuration.getCreditRetrievalPeriod().intValue(), id -> {
                        if (this.setCreditsAndRequest(sender) != 0L || this.isCancelled()) {
                            this.holder.getVertx().cancelTimer(id.longValue());
                            this.creditRetrievalInProgress = false;
                        } else {
                            AMQPLogging.log.stillNoMoreCreditsForChannel(this.configuration.getChannel());
                        }
                    });
                } else {
                    this.creditRetrievalInProgress = false;
                }
            });
        }
    }

    private boolean isCancelled() {
        Flow.Subscription subscription = this.upstream.get();
        return subscription == Subscriptions.CANCELLED || subscription == null;
    }

    @Override
    public void onError(Throwable throwable) {
        Flow.Subscription sub = this.upstream.getAndSet((Flow.Subscription)Subscriptions.CANCELLED);
        Flow.Subscriber<Message<?>> subscriber = this.downstream.get();
        if (sub != null && sub != Subscriptions.CANCELLED && subscriber != null) {
            subscriber.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        Flow.Subscription sub = this.upstream.getAndSet((Flow.Subscription)Subscriptions.CANCELLED);
        Flow.Subscriber<Message<?>> subscriber = this.downstream.get();
        if (sub != null && sub != Subscriptions.CANCELLED && subscriber != null) {
            subscriber.onComplete();
        }
    }

    @Override
    public void request(long l) {
        if (!this.once.getAndSet(true)) {
            this.getSenderAndCredits().subscribe().with(s -> {}, f -> this.downstream.get().onError((Throwable)f));
        }
    }

    @Override
    public void cancel() {
        Flow.Subscription sub = this.upstream.getAndSet((Flow.Subscription)Subscriptions.CANCELLED);
        if (sub != null && sub != Subscriptions.CANCELLED) {
            sub.cancel();
        }
    }

    private Uni<Tuple3<Message<?>, AmqpSender, Long>> send(Message<?> msg, boolean durable, long ttl, String configuredAddress, boolean isAnonymousSender) {
        io.vertx.mutiny.amqp.AmqpMessage amqp = this.getMessage(msg, durable, ttl, configuredAddress, isAnonymousSender);
        if (amqp == null) {
            return Uni.createFrom().nullItem();
        }
        if (this.tracingEnabled) {
            this.amqpInstrumenter.traceOutgoing(msg, new AmqpMessage<Object>(amqp, null, null, false, (Boolean)true));
        }
        return this.retrieveSender.onItem().transformToUni(s -> s.sendWithAck(amqp).replaceWith(() -> Tuple3.of((Object)msg, (Object)s, (Object)s.remainingCredits()))).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(this.retryInterval)).atMost((long)this.retryAttempts).onItemOrFailure().call((s, failure) -> {
            if (failure != null) {
                return Uni.createFrom().completionStage(msg.nack(failure));
            }
            return Uni.createFrom().completionStage(msg.ack());
        });
    }

    private io.vertx.mutiny.amqp.AmqpMessage getMessage(Message<?> msg, boolean durable, long ttl, String configuredAddress, boolean isAnonymousSender) {
        io.vertx.mutiny.amqp.AmqpMessage amqp;
        OutgoingCloudEventMetadata ceMetadata = msg.getMetadata(OutgoingCloudEventMetadata.class).orElse(null);
        if (msg instanceof AmqpMessage) {
            amqp = ((AmqpMessage)msg).getAmqpMessage();
        } else if (msg.getPayload() instanceof io.vertx.mutiny.amqp.AmqpMessage) {
            amqp = (io.vertx.mutiny.amqp.AmqpMessage)msg.getPayload();
        } else if (msg.getPayload() instanceof io.vertx.amqp.AmqpMessage) {
            amqp = new io.vertx.mutiny.amqp.AmqpMessage((io.vertx.amqp.AmqpMessage)msg.getPayload());
        } else if (msg.getPayload() instanceof org.apache.qpid.proton.message.Message) {
            org.apache.qpid.proton.message.Message message = (org.apache.qpid.proton.message.Message)msg.getPayload();
            AmqpMessageImpl vertxMessage = new AmqpMessageImpl(message);
            amqp = new io.vertx.mutiny.amqp.AmqpMessage((io.vertx.amqp.AmqpMessage)vertxMessage);
        } else {
            amqp = AmqpMessageConverter.convertToAmqpMessage(msg, durable, ttl);
        }
        if (this.writeCloudEvents && (ceMetadata != null || this.mandatoryCloudEventAttributeSet)) {
            amqp = this.writeAsBinaryCloudEvent ? AmqpCloudEventHelper.createBinaryCloudEventMessage(amqp, ceMetadata, this.configuration) : AmqpCloudEventHelper.createStructuredEventMessage(amqp, ceMetadata, this.configuration);
        }
        String actualAddress = this.getActualAddress(msg, amqp, configuredAddress, isAnonymousSender);
        if (!this.once.get()) {
            AMQPLogging.log.messageNoSend(actualAddress);
            return null;
        }
        if (!actualAddress.equals(amqp.address())) {
            amqp.getDelegate().unwrap().setAddress(actualAddress);
        }
        AMQPLogging.log.sendingMessageToAddress(actualAddress);
        return amqp;
    }

    private String getActualAddress(Message<?> message, io.vertx.mutiny.amqp.AmqpMessage amqp, String configuredAddress, boolean isAnonymousSender) {
        String address = amqp.address();
        if (address != null) {
            if (isAnonymousSender) {
                return address;
            }
            AMQPLogging.log.unableToUseAddress(address, configuredAddress);
            return configuredAddress;
        }
        return message.getMetadata(OutgoingAmqpMetadata.class).flatMap(o -> {
            String addressFromMessage = o.getAddress();
            if (addressFromMessage != null && !isAnonymousSender) {
                AMQPLogging.log.unableToUseAddress(addressFromMessage, configuredAddress);
                return Optional.empty();
            }
            return Optional.ofNullable(addressFromMessage);
        }).orElse(configuredAddress);
    }
}

