/*
 * Decompiled with CFR 0.152.
 */
package reactor.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Method;
import com.rabbitmq.client.ReturnListener;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ChannelCloseHandlers;
import reactor.rabbitmq.ChannelProxy;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.Helpers;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.OutboundMessageResult;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFluxException;
import reactor.rabbitmq.ResourceManagementOptions;
import reactor.rabbitmq.RpcClient;
import reactor.rabbitmq.SendOptions;
import reactor.rabbitmq.SenderOptions;
import reactor.rabbitmq.SubscriberState;
import reactor.rabbitmq.Utils;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

public class Sender
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new ChannelCreationFunction();
    private static final Function<Connection, Channel> CHANNEL_PROXY_CREATION_FUNCTION = new ChannelProxyCreationFunction();
    private final Mono<? extends Connection> connectionMono;
    private final Mono<? extends Channel> channelMono;
    private final BiConsumer<SignalType, Channel> channelCloseHandler;
    private final AtomicReference<Connection> connection = new AtomicReference();
    private final Mono<? extends Channel> resourceManagementChannelMono;
    private final Scheduler resourceManagementScheduler;
    private final boolean privateResourceManagementScheduler;
    private final Scheduler connectionSubscriptionScheduler;
    private final boolean privateConnectionSubscriptionScheduler;
    private final ExecutorService channelCloseThreadPool = Executors.newCachedThreadPool();
    private final int connectionClosingTimeout;
    private final AtomicBoolean closingOrClosed = new AtomicBoolean(false);
    private static final String REACTOR_RABBITMQ_DELIVERY_TAG_HEADER = "reactor_rabbitmq_delivery_tag";

    public Sender() {
        this(new SenderOptions());
    }

    public Sender(SenderOptions options) {
        Mono cm;
        this.privateConnectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null;
        Scheduler scheduler = this.connectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null ? this.createScheduler("rabbitmq-sender-connection-subscription") : options.getConnectionSubscriptionScheduler();
        if (options.getConnectionMono() == null) {
            cm = Mono.fromCallable(() -> {
                if (options.getConnectionSupplier() == null) {
                    return options.getConnectionFactory().newConnection();
                }
                return options.getConnectionSupplier().apply(null);
            });
            cm = options.getConnectionMonoConfigurator().apply((Mono<? extends Connection>)cm);
            cm = cm.doOnNext(conn -> this.connection.set((Connection)conn)).subscribeOn(this.connectionSubscriptionScheduler).transform(this::cache);
        } else {
            cm = options.getConnectionMono();
        }
        this.connectionMono = cm;
        this.channelMono = options.getChannelMono();
        this.channelCloseHandler = options.getChannelCloseHandler() == null ? ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE : options.getChannelCloseHandler();
        this.privateResourceManagementScheduler = options.getResourceManagementScheduler() == null;
        this.resourceManagementScheduler = options.getResourceManagementScheduler() == null ? this.createScheduler("rabbitmq-sender-resource-creation") : options.getResourceManagementScheduler();
        this.resourceManagementChannelMono = options.getResourceManagementChannelMono() == null ? this.connectionMono.map(CHANNEL_PROXY_CREATION_FUNCTION).transform(this::cache) : options.getResourceManagementChannelMono();
        this.connectionClosingTimeout = options.getConnectionClosingTimeout() != null && !Duration.ZERO.equals(options.getConnectionClosingTimeout()) ? (int)options.getConnectionClosingTimeout().toMillis() : -1;
    }

    protected Scheduler createScheduler(String name) {
        return Schedulers.newElastic((String)name);
    }

    protected <T> Mono<T> cache(Mono<T> mono) {
        return Utils.cache(mono);
    }

    public Mono<Void> send(Publisher<OutboundMessage> messages) {
        return this.send(messages, new SendOptions());
    }

    public Mono<Void> send(Publisher<OutboundMessage> messages, @Nullable SendOptions options) {
        options = options == null ? new SendOptions() : options;
        Mono<? extends Channel> currentChannelMono = this.getChannelMono(options);
        BiConsumer<SendContext, Exception> exceptionHandler = options.getExceptionHandler();
        BiConsumer<SignalType, Channel> channelCloseHandler = this.getChannelCloseHandler(options);
        return currentChannelMono.flatMapMany(channel -> Flux.from((Publisher)messages).doOnNext(message -> {
            try {
                channel.basicPublish(message.getExchange(), message.getRoutingKey(), message.getProperties(), message.getBody());
            }
            catch (Exception e) {
                exceptionHandler.accept(new SendContext<OutboundMessage>((Channel)channel, (OutboundMessage)message), e);
            }
        }).doOnError(e -> LOGGER.warn("Send failed with exception {}", e)).doFinally(st -> channelCloseHandler.accept((SignalType)st, (Channel)channel))).then();
    }

    public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMessage> messages) {
        return this.sendWithPublishConfirms(messages, new SendOptions());
    }

    public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMessage> messages, SendOptions options) {
        return Flux.from(this.sendWithTypedPublishConfirms(messages, options));
    }

    public <OMSG extends OutboundMessage> Flux<OutboundMessageResult<OMSG>> sendWithTypedPublishConfirms(Publisher<OMSG> messages) {
        return this.sendWithTypedPublishConfirms(messages, new SendOptions());
    }

    public <OMSG extends OutboundMessage> Flux<OutboundMessageResult<OMSG>> sendWithTypedPublishConfirms(Publisher<OMSG> messages, @Nullable SendOptions options) {
        SendOptions sendOptions = options == null ? new SendOptions() : options;
        Mono<? extends Channel> currentChannelMono = this.getChannelMono(options);
        BiConsumer<SignalType, Channel> channelCloseHandler = this.getChannelCloseHandler(options);
        Flux result = currentChannelMono.map(channel -> {
            try {
                channel.confirmSelect();
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error while setting publisher confirms on channel", e);
            }
            return channel;
        }).flatMapMany(channel -> new PublishConfirmOperator(messages, (Channel)channel, sendOptions).doFinally(signalType -> {
            if (signalType == SignalType.ON_ERROR) {
                channelCloseHandler.accept((SignalType)signalType, (Channel)channel);
            } else {
                this.channelCloseThreadPool.execute(() -> channelCloseHandler.accept((SignalType)signalType, (Channel)channel));
            }
        }));
        if (sendOptions.getMaxInFlight() != null) {
            result = result.publishOn(sendOptions.getScheduler(), sendOptions.getMaxInFlight().intValue());
        }
        return result;
    }

    Mono<? extends Channel> getChannelMono(SendOptions options) {
        return Stream.of(options.getChannelMono(), this.channelMono).filter(Objects::nonNull).findFirst().orElse(this.connectionMono.map(CHANNEL_CREATION_FUNCTION));
    }

    private BiConsumer<SignalType, Channel> getChannelCloseHandler(SendOptions options) {
        return options.getChannelCloseHandler() != null ? options.getChannelCloseHandler() : this.channelCloseHandler;
    }

    public RpcClient rpcClient(String exchange, String routingKey) {
        return new RpcClient((Mono<Channel>)this.connectionMono.map(CHANNEL_CREATION_FUNCTION).transform(this::cache), exchange, routingKey);
    }

    public RpcClient rpcClient(String exchange, String routingKey, Supplier<String> correlationIdProvider) {
        return new RpcClient((Mono<Channel>)this.connectionMono.map(CHANNEL_CREATION_FUNCTION).transform(this::cache), exchange, routingKey, correlationIdProvider);
    }

    public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification specification) {
        return this.declareQueue(specification, null);
    }

    public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification specification, @Nullable ResourceManagementOptions options) {
        return this.declareQueue(specification, options);
    }

    public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification specification) {
        return this.declareQueue(specification, null);
    }

    public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification specification, @Nullable ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Queue.Declare declare = specification.getName() == null ? new AMQP.Queue.Declare.Builder().queue("").durable(false).exclusive(true).autoDelete(true).arguments(specification.getArguments()).build() : new AMQP.Queue.Declare.Builder().queue(specification.getName()).durable(specification.isDurable()).exclusive(specification.isExclusive()).autoDelete(specification.isAutoDelete()).passive(specification.isPassive()).arguments(specification.getArguments()).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)declare);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Queue.DeclareOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    private Mono<? extends Channel> getChannelMonoForResourceManagement(ResourceManagementOptions options) {
        return options != null && options.getChannelMono() != null ? options.getChannelMono() : this.resourceManagementChannelMono;
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification) {
        return this.delete(specification, false, false);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification, @Nullable ResourceManagementOptions options) {
        return this.delete(specification, false, false, options);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification, boolean ifUnused, boolean ifEmpty) {
        return this.deleteQueue(specification, ifUnused, ifEmpty);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification, boolean ifUnused, boolean ifEmpty, @Nullable ResourceManagementOptions options) {
        return this.deleteQueue(specification, ifUnused, ifEmpty, options);
    }

    public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification specification, boolean ifUnused, boolean ifEmpty) {
        return this.deleteQueue(specification, ifUnused, ifEmpty, null);
    }

    public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification specification, boolean ifUnused, boolean ifEmpty, @Nullable ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Queue.Delete delete = new AMQP.Queue.Delete.Builder().queue(specification.getName()).ifUnused(ifUnused).ifEmpty(ifEmpty).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)delete);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Queue.DeleteOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification specification) {
        return this.declareExchange(specification, null);
    }

    public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification specification, @Nullable ResourceManagementOptions options) {
        return this.declareExchange(specification, options);
    }

    public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification specification) {
        return this.declareExchange(specification, null);
    }

    public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification specification, @Nullable ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Exchange.Declare declare = new AMQP.Exchange.Declare.Builder().exchange(specification.getName()).type(specification.getType()).durable(specification.isDurable()).autoDelete(specification.isAutoDelete()).internal(specification.isInternal()).passive(specification.isPassive()).arguments(specification.getArguments()).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)declare);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Exchange.DeclareOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification) {
        return this.delete(specification, false);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification, @Nullable ResourceManagementOptions options) {
        return this.delete(specification, false, options);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification, boolean ifUnused) {
        return this.deleteExchange(specification, ifUnused);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification, boolean ifUnused, @Nullable ResourceManagementOptions options) {
        return this.deleteExchange(specification, ifUnused, options);
    }

    public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification specification, boolean ifUnused) {
        return this.deleteExchange(specification, ifUnused, null);
    }

    public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification specification, boolean ifUnused, @Nullable ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Exchange.Delete delete = new AMQP.Exchange.Delete.Builder().exchange(specification.getName()).ifUnused(ifUnused).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)delete);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Exchange.DeleteOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification specification) {
        return this.unbind(specification, null);
    }

    public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification specification, @Nullable ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Queue.Unbind unbinding = new AMQP.Queue.Unbind.Builder().exchange(specification.getExchange()).queue(specification.getQueue()).routingKey(specification.getRoutingKey()).arguments(specification.getArguments()).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)unbinding);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Queue.UnbindOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Queue.BindOk> bind(BindingSpecification specification) {
        return this.bind(specification, null);
    }

    public Mono<AMQP.Queue.BindOk> bind(BindingSpecification specification, @Nullable ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Queue.Bind binding = new AMQP.Queue.Bind.Builder().exchange(specification.getExchange()).queue(specification.getQueue()).routingKey(specification.getRoutingKey()).arguments(specification.getArguments()).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)binding);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Queue.BindOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    @Override
    public void close() {
        if (this.closingOrClosed.compareAndSet(false, true)) {
            if (this.connection.get() != null) {
                Helpers.safelyExecute(LOGGER, () -> this.connection.get().close(this.connectionClosingTimeout), "Error while closing sender connection");
            }
            if (this.privateConnectionSubscriptionScheduler) {
                Helpers.safelyExecute(LOGGER, () -> this.connectionSubscriptionScheduler.dispose(), "Error while disposing connection subscription scheduler");
            }
            if (this.privateResourceManagementScheduler) {
                Helpers.safelyExecute(LOGGER, () -> this.resourceManagementScheduler.dispose(), "Error while disposing resource management scheduler");
            }
            Helpers.safelyExecute(LOGGER, () -> this.channelCloseThreadPool.shutdown(), "Error while closing channel closing thread pool");
        }
    }

    private static class ChannelProxyCreationFunction
    implements Function<Connection, Channel> {
        private ChannelProxyCreationFunction() {
        }

        @Override
        public Channel apply(Connection connection) {
            try {
                return new ChannelProxy(connection);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        }
    }

    private static class ChannelCreationFunction
    implements Function<Connection, Channel> {
        private ChannelCreationFunction() {
        }

        @Override
        public Channel apply(Connection connection) {
            try {
                return connection.createChannel();
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        }
    }

    private static class PublishConfirmSubscriber<OMSG extends OutboundMessage>
    implements CoreSubscriber<OMSG>,
    Subscription {
        private final AtomicReference<SubscriberState> state = new AtomicReference<SubscriberState>(SubscriberState.INIT);
        private final AtomicReference<Throwable> firstException = new AtomicReference();
        private final ConcurrentNavigableMap<Long, OMSG> unconfirmed = new ConcurrentSkipListMap<Long, OMSG>();
        private final Channel channel;
        private final Subscriber<? super OutboundMessageResult<OMSG>> subscriber;
        private final BiConsumer<SendContext, Exception> exceptionHandler;
        private Subscription subscription;
        private ConfirmListener confirmListener;
        private ReturnListener returnListener;
        private final boolean trackReturned;
        private final BiFunction<AMQP.BasicProperties, Long, AMQP.BasicProperties> propertiesProcessor;

        private PublishConfirmSubscriber(Channel channel, Subscriber<? super OutboundMessageResult<OMSG>> subscriber, SendOptions options) {
            this.channel = channel;
            this.subscriber = subscriber;
            this.exceptionHandler = options.getExceptionHandler();
            this.trackReturned = options.isTrackReturned();
            this.propertiesProcessor = this.trackReturned ? PublishConfirmSubscriber::addReactorRabbitMQDeliveryTag : (properties, deliveryTag) -> properties;
        }

        public void request(long n) {
            this.subscription.request(n);
        }

        public void cancel() {
            this.subscription.cancel();
        }

        public void onSubscribe(Subscription subscription) {
            if (Operators.validate((Subscription)this.subscription, (Subscription)subscription)) {
                if (this.trackReturned) {
                    this.returnListener = (replyCode, replyText, exchange, routingKey, properties, body) -> {
                        try {
                            Object deliveryTagObj = properties.getHeaders().get(Sender.REACTOR_RABBITMQ_DELIVERY_TAG_HEADER);
                            if (deliveryTagObj instanceof Long) {
                                Long deliveryTag = (Long)deliveryTagObj;
                                OutboundMessage outboundMessage = (OutboundMessage)this.unconfirmed.get(deliveryTag);
                                this.subscriber.onNext(new OutboundMessageResult<OutboundMessage>(outboundMessage, true, true));
                                this.unconfirmed.remove(deliveryTag);
                            } else {
                                this.handleError(new IllegalArgumentException("Missing header reactor_rabbitmq_delivery_tag"), null);
                            }
                        }
                        catch (Exception e) {
                            this.handleError(e, null);
                        }
                    };
                    this.channel.addReturnListener(this.returnListener);
                }
                this.confirmListener = new ConfirmListener(){

                    public void handleAck(long deliveryTag, boolean multiple) {
                        this.handleAckNack(deliveryTag, multiple, true);
                    }

                    public void handleNack(long deliveryTag, boolean multiple) {
                        this.handleAckNack(deliveryTag, multiple, false);
                    }

                    private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) {
                        if (multiple) {
                            try {
                                NavigableMap unconfirmedToSend = unconfirmed.headMap((Object)deliveryTag, true);
                                Iterator iterator = unconfirmedToSend.entrySet().iterator();
                                while (iterator.hasNext()) {
                                    subscriber.onNext(new OutboundMessageResult<OutboundMessage>((OutboundMessage)iterator.next().getValue(), ack, false));
                                    iterator.remove();
                                }
                            }
                            catch (Exception e) {
                                this.handleError(e, null);
                            }
                        } else {
                            OutboundMessage outboundMessage = (OutboundMessage)unconfirmed.get(deliveryTag);
                            if (outboundMessage != null) {
                                try {
                                    unconfirmed.remove(deliveryTag);
                                    subscriber.onNext(new OutboundMessageResult<OutboundMessage>(outboundMessage, ack, false));
                                }
                                catch (Exception e) {
                                    this.handleError(e, new OutboundMessageResult<OutboundMessage>(outboundMessage, ack, false));
                                }
                            }
                        }
                        if (unconfirmed.isEmpty()) {
                            this.maybeComplete();
                        }
                    }
                };
                this.channel.addConfirmListener(this.confirmListener);
                this.channel.addShutdownListener(sse -> {
                    if (!sse.isHardError() && !sse.isInitiatedByApplication()) {
                        this.subscriber.onError((Throwable)sse);
                    }
                });
                this.state.set(SubscriberState.ACTIVE);
                this.subscription = subscription;
                this.subscriber.onSubscribe((Subscription)this);
            }
        }

        public void onNext(OMSG message) {
            if (this.checkComplete(message)) {
                return;
            }
            long nextPublishSeqNo = this.channel.getNextPublishSeqNo();
            try {
                this.unconfirmed.putIfAbsent(nextPublishSeqNo, message);
                this.channel.basicPublish(((OutboundMessage)message).getExchange(), ((OutboundMessage)message).getRoutingKey(), this.trackReturned, this.propertiesProcessor.apply(((OutboundMessage)message).getProperties(), nextPublishSeqNo), ((OutboundMessage)message).getBody());
            }
            catch (Exception e) {
                this.unconfirmed.remove(nextPublishSeqNo);
                try {
                    this.exceptionHandler.accept(new ConfirmSendContext<OMSG>(this.channel, message, this), e);
                }
                catch (Exception innerException) {
                    this.handleError(innerException, new OutboundMessageResult<OMSG>(message, false, false));
                }
            }
        }

        private static AMQP.BasicProperties addReactorRabbitMQDeliveryTag(AMQP.BasicProperties properties, long deliveryTag) {
            AMQP.BasicProperties baseProperties = properties != null ? properties : new AMQP.BasicProperties();
            HashMap<String, Long> headers = baseProperties.getHeaders() != null ? new HashMap<String, Long>(baseProperties.getHeaders()) : new HashMap();
            headers.putIfAbsent(Sender.REACTOR_RABBITMQ_DELIVERY_TAG_HEADER, deliveryTag);
            return baseProperties.builder().headers(headers).build();
        }

        public void onError(Throwable throwable) {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.COMPLETE) || this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                this.channel.removeConfirmListener(this.confirmListener);
                if (this.returnListener != null) {
                    this.channel.removeReturnListener(this.returnListener);
                }
                this.subscriber.onError(throwable);
            } else if (this.firstException.compareAndSet(null, throwable) && this.state.get() == SubscriberState.COMPLETE) {
                Operators.onErrorDropped((Throwable)throwable, (Context)this.currentContext());
            }
        }

        public void onComplete() {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.OUTBOUND_DONE) && this.unconfirmed.size() == 0) {
                this.maybeComplete();
            }
        }

        private void handleError(Exception e, @Nullable OutboundMessageResult<OMSG> result) {
            LOGGER.error("error in publish confirm sending", (Throwable)e);
            boolean complete = this.checkComplete(e);
            this.firstException.compareAndSet(null, e);
            if (!complete) {
                if (result != null) {
                    this.subscriber.onNext(result);
                }
                this.onError(e);
            }
        }

        private void maybeComplete() {
            boolean done = this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE);
            if (done) {
                this.channel.removeConfirmListener(this.confirmListener);
                if (this.returnListener != null) {
                    this.channel.removeReturnListener(this.returnListener);
                }
                this.subscriber.onComplete();
            }
        }

        public <T> boolean checkComplete(T t) {
            boolean complete;
            boolean bl = complete = this.state.get() == SubscriberState.COMPLETE;
            if (complete && this.firstException.get() == null) {
                Operators.onNextDropped(t, (Context)this.currentContext());
            }
            return complete;
        }
    }

    private static class PublishConfirmOperator<OMSG extends OutboundMessage>
    extends FluxOperator<OMSG, OutboundMessageResult<OMSG>> {
        private final Channel channel;
        private final SendOptions options;

        public PublishConfirmOperator(Publisher<OMSG> source, Channel channel, SendOptions options) {
            super(Flux.from(source));
            this.channel = channel;
            this.options = options;
        }

        public void subscribe(CoreSubscriber<? super OutboundMessageResult<OMSG>> actual) {
            this.source.subscribe(new PublishConfirmSubscriber(this.channel, (Subscriber)actual, this.options));
        }
    }

    public static class ConfirmSendContext<OMSG extends OutboundMessage>
    extends SendContext<OMSG> {
        private final PublishConfirmSubscriber<OMSG> subscriber;

        protected ConfirmSendContext(Channel channel, OMSG message, PublishConfirmSubscriber<OMSG> subscriber) {
            super(channel, message);
            this.subscriber = subscriber;
        }

        @Override
        public void publish(OutboundMessage outboundMessage) throws Exception {
            long nextPublishSeqNo = this.channel.getNextPublishSeqNo();
            try {
                ((PublishConfirmSubscriber)this.subscriber).unconfirmed.putIfAbsent(nextPublishSeqNo, this.message);
                this.channel.basicPublish(outboundMessage.getExchange(), outboundMessage.getRoutingKey(), ((PublishConfirmSubscriber)this.subscriber).trackReturned, (AMQP.BasicProperties)((PublishConfirmSubscriber)this.subscriber).propertiesProcessor.apply(this.message.getProperties(), nextPublishSeqNo), outboundMessage.getBody());
            }
            catch (Exception e) {
                ((PublishConfirmSubscriber)this.subscriber).unconfirmed.remove(nextPublishSeqNo);
                throw e;
            }
        }

        @Override
        public void publish() throws Exception {
            this.publish((OutboundMessage)this.getMessage());
        }
    }

    public static class SendContext<OMSG extends OutboundMessage> {
        protected final Channel channel;
        protected final OMSG message;

        protected SendContext(Channel channel, OMSG message) {
            this.channel = channel;
            this.message = message;
        }

        public OMSG getMessage() {
            return this.message;
        }

        public Channel getChannel() {
            return this.channel;
        }

        public void publish(OutboundMessage outboundMessage) throws Exception {
            this.channel.basicPublish(outboundMessage.getExchange(), outboundMessage.getRoutingKey(), outboundMessage.getProperties(), outboundMessage.getBody());
        }

        public void publish() throws Exception {
            this.publish((OutboundMessage)this.getMessage());
        }
    }
}

