/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.Message;
import com.rabbitmq.client.amqp.ObservationCollector;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.Resource;
import com.rabbitmq.client.amqp.impl.AmqpConnection;
import com.rabbitmq.client.amqp.impl.AmqpMessage;
import com.rabbitmq.client.amqp.impl.AmqpPublisherBuilder;
import com.rabbitmq.client.amqp.impl.DefaultAddressBuilder;
import com.rabbitmq.client.amqp.impl.ExceptionUtils;
import com.rabbitmq.client.amqp.impl.ResourceBase;
import com.rabbitmq.client.amqp.impl.SessionHandler;
import com.rabbitmq.client.amqp.impl.Utils;
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
import com.rabbitmq.qpid.protonj2.client.DeliveryMode;
import com.rabbitmq.qpid.protonj2.client.DeliveryState;
import com.rabbitmq.qpid.protonj2.client.Sender;
import com.rabbitmq.qpid.protonj2.client.SenderOptions;
import com.rabbitmq.qpid.protonj2.client.Session;
import com.rabbitmq.qpid.protonj2.client.Tracker;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class AmqpPublisher
extends ResourceBase
implements Publisher {
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0L);
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpPublisher.class);
    private final Long id;
    private volatile Sender sender;
    private final ExecutorService executorService;
    private final String address;
    private final AmqpConnection connection;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MetricsCollector metricsCollector;
    private final ObservationCollector observationCollector;
    private final Function<Message, Tracker> publishCall;
    private final DefaultAddressBuilder.DestinationSpec destinationSpec;
    private final Duration publishTimeout;
    private final SessionHandler sessionHandler;
    private volatile ObservationCollector.ConnectionInfo connectionInfo;
    private final ExecutorService dispatchingExecutorService;
    private final Consumer<ClientException> nativeCloseHandler;

    AmqpPublisher(AmqpPublisherBuilder builder) {
        super(builder.listeners());
        this.id = ID_SEQUENCE.getAndIncrement();
        this.executorService = builder.connection().environment().publisherExecutorService();
        this.address = builder.address();
        this.destinationSpec = builder.destination();
        this.connection = builder.connection();
        this.publishTimeout = builder.publishTimeout();
        this.sessionHandler = this.connection.createSessionHandler();
        this.dispatchingExecutorService = this.connection.dispatchingExecutorService();
        this.nativeCloseHandler = e -> this.dispatchingExecutorService.submit(() -> {
            boolean ignored = AmqpPublisher.maybeCloseConsumerOnException(this, e);
        });
        this.sender = this.createSender(this.sessionHandler.session(), this.address, this.publishTimeout, this.nativeCloseHandler);
        this.metricsCollector = this.connection.metricsCollector();
        this.observationCollector = this.connection.observationCollector();
        this.state(Resource.State.OPEN);
        this.metricsCollector.openPublisher();
        this.publishCall = msg -> {
            try {
                com.rabbitmq.qpid.protonj2.client.Message<?> nativeMessage = ((AmqpMessage)msg).nativeMessage();
                return this.sender.send(nativeMessage.durable(true));
            }
            catch (ClientIllegalStateException e) {
                LOGGER.debug("Error while publishing: '{}'. Closing publisher.", (Object)e.getMessage());
                this.close(ExceptionUtils.convert(e));
                throw ExceptionUtils.convert(e);
            }
            catch (ClientException e) {
                LOGGER.debug("Error while publishing: '{}'.", (Object)e.getMessage());
                throw ExceptionUtils.convert(e);
            }
        };
        this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
    }

    @Override
    public Message message() {
        return new AmqpMessage();
    }

    @Override
    public Message message(byte[] body) {
        return new AmqpMessage(body);
    }

    @Override
    public void publish(Message message, Publisher.Callback callback) {
        this.checkOpen();
        Tracker tracker = this.observationCollector.publish(this.destinationSpec.exchange(), this.destinationSpec.routingKey(), message, this.connectionInfo, this.publishCall);
        tracker.settlementFuture().handleAsync((t, ex) -> {
            Publisher.Status status = ex == null ? this.mapDeliveryState(t.remoteState()) : Publisher.Status.REJECTED;
            DefaultContext defaultContext = new DefaultContext(message, status);
            this.metricsCollector.publishDisposition(AmqpPublisher.mapToPublishDisposition(status));
            callback.handle(defaultContext);
            return null;
        }, (Executor)this.executorService);
        this.metricsCollector.publish();
    }

    private Publisher.Status mapDeliveryState(DeliveryState in) {
        if (in.isAccepted()) {
            return Publisher.Status.ACCEPTED;
        }
        if (in.getType() == DeliveryState.Type.REJECTED) {
            return Publisher.Status.REJECTED;
        }
        if (in.getType() == DeliveryState.Type.RELEASED) {
            return Publisher.Status.RELEASED;
        }
        LOGGER.warn("Delivery state not supported: " + in.getType());
        throw new IllegalStateException("This delivery state is not supported: " + in.getType());
    }

    private static MetricsCollector.PublishDisposition mapToPublishDisposition(Publisher.Status status) {
        if (status == Publisher.Status.ACCEPTED) {
            return MetricsCollector.PublishDisposition.ACCEPTED;
        }
        if (status == Publisher.Status.REJECTED) {
            return MetricsCollector.PublishDisposition.REJECTED;
        }
        if (status == Publisher.Status.RELEASED) {
            return MetricsCollector.PublishDisposition.RELEASED;
        }
        return null;
    }

    void recoverAfterConnectionFailure() {
        this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
        this.sender = this.createSender(this.sessionHandler.sessionNoCheck(), this.address, this.publishTimeout, this.nativeCloseHandler);
    }

    @Override
    public void close() {
        this.close(null);
    }

    private Sender createSender(Session session, String address, Duration publishTimeout, Consumer<ClientException> nativeCloseHandler) {
        SenderOptions senderOptions = ((SenderOptions)new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)).sendTimeout(publishTimeout.isNegative() ? -1L : publishTimeout.toMillis()).closeHandler(nativeCloseHandler);
        try {
            Sender s = address == null ? session.openAnonymousSender() : session.openSender(address, senderOptions);
            return (Sender)ExceptionUtils.wrapGet(s.openFuture());
        }
        catch (ClientException e) {
            throw ExceptionUtils.convert(e, "Error while creating publisher to %s", address);
        }
    }

    void close(Throwable cause) {
        if (this.closed.compareAndSet(false, true)) {
            this.state(Resource.State.CLOSING, cause);
            this.connection.removePublisher(this);
            try {
                this.sender.close();
                this.sessionHandler.close();
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing sender", (Throwable)e);
            }
            this.state(Resource.State.CLOSED, cause);
            this.metricsCollector.closePublisher();
        }
    }

    private static boolean maybeCloseConsumerOnException(AmqpPublisher publisher, Exception ex) {
        return ExceptionUtils.maybeCloseConsumerOnException(publisher::close, ex);
    }

    Long id() {
        return this.id;
    }

    String address() {
        return this.address;
    }

    private static class DefaultContext
    implements Publisher.Context {
        private final Message message;
        private final Publisher.Status status;

        private DefaultContext(Message message, Publisher.Status status) {
            this.message = message;
            this.status = status;
        }

        @Override
        public Message message() {
            return this.message;
        }

        @Override
        public Publisher.Status status() {
            return this.status;
        }
    }
}

