/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.rabbitmq.reactive;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.core.annotation.Internal;
import io.micronaut.messaging.exceptions.MessagingClientException;
import io.micronaut.rabbitmq.bind.RabbitConsumerState;
import io.micronaut.rabbitmq.connect.ChannelPool;
import io.micronaut.rabbitmq.connect.RabbitConnectionFactoryConfig;
import io.micronaut.rabbitmq.exception.RabbitClientException;
import io.micronaut.rabbitmq.intercept.DefaultConsumer;
import io.micronaut.rabbitmq.reactive.RabbitPublishState;
import io.micronaut.rabbitmq.reactive.ReactivePublisher;
import java.io.IOException;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

@Internal
@EachBean(value=ChannelPool.class)
public class ReactorReactivePublisher
implements ReactivePublisher {
    private final ChannelPool channelPool;
    private final RabbitConnectionFactoryConfig config;

    public ReactorReactivePublisher(@Parameter ChannelPool channelPool, @Parameter RabbitConnectionFactoryConfig config) {
        this.channelPool = channelPool;
        this.config = config;
    }

    public Mono<Void> publishAndConfirm(RabbitPublishState publishState) {
        return Mono.from((Publisher)this.getChannel().flatMap(this::initializePublish).flatMap(channel -> this.publishInternal((Channel)channel, publishState)).timeout(this.config.getConfirmTimeout(), Mono.error((Throwable)((Object)new RabbitClientException(String.format("Timed out waiting for publisher confirm for exchange: [%s] and routing key: [%s]", publishState.getExchange(), publishState.getRoutingKey()))))).then());
    }

    public Mono<Void> publish(RabbitPublishState publishState) {
        return this.getChannel().flatMap(channel -> this.publishInternalNoConfirm((Channel)channel, publishState)).then();
    }

    public Flux<RabbitConsumerState> publishAndReply(RabbitPublishState publishState) {
        Flux flowable = this.getChannel().flatMap(channel -> this.publishRpcInternal((Channel)channel, publishState)).flux();
        this.config.getRpc().getTimeout().ifPresent(arg_0 -> ((Flux)flowable).timeout(arg_0));
        return flowable;
    }

    protected Mono<Channel> getChannel() {
        return Mono.create(emitter -> {
            try {
                Channel channel = this.channelPool.getChannel();
                emitter.success((Object)channel);
            }
            catch (IOException e) {
                emitter.error((Throwable)((Object)new RabbitClientException("Failed to retrieve a channel from the pool", e)));
            }
        });
    }

    protected Mono<Object> publishInternal(Channel channel, RabbitPublishState publishState) {
        return Mono.create(subscriber -> {
            Disposable listener = this.createListener(channel, (MonoSink<Object>)subscriber, publishState);
            try {
                channel.basicPublish(publishState.getExchange(), publishState.getRoutingKey(), publishState.getProperties(), publishState.getBody());
            }
            catch (IOException e) {
                listener.dispose();
                subscriber.error((Throwable)e);
            }
        }).doFinally(signalType -> this.returnChannel(channel));
    }

    protected Mono<RabbitConsumerState> publishRpcInternal(Channel channel, RabbitPublishState publishState) {
        return Mono.create(subscriber -> {
            Disposable listener = null;
            try {
                String correlationId = UUID.randomUUID().toString();
                AMQP.BasicProperties properties = publishState.getProperties().builder().correlationId(correlationId).build();
                listener = this.createConsumer(channel, publishState, correlationId, (MonoSink<RabbitConsumerState>)subscriber);
                channel.basicPublish(publishState.getExchange(), publishState.getRoutingKey(), properties, publishState.getBody());
            }
            catch (IOException e) {
                if (listener != null) {
                    listener.dispose();
                }
                subscriber.error((Throwable)new MessagingClientException("Failed to publish the message", (Throwable)e));
            }
        }).doFinally(signalType -> this.returnChannel(channel));
    }

    protected Mono<Object> publishInternalNoConfirm(Channel channel, RabbitPublishState publishState) {
        return Mono.create(subscriber -> {
            try {
                channel.basicPublish(publishState.getExchange(), publishState.getRoutingKey(), publishState.getProperties(), publishState.getBody());
                subscriber.success();
            }
            catch (IOException e) {
                subscriber.error((Throwable)new MessagingClientException("Failed to publish the message", (Throwable)e));
            }
        }).doFinally(signalType -> this.returnChannel(channel));
    }

    protected Mono<Channel> initializePublish(Channel channel) {
        return Mono.create(emitter -> {
            try {
                channel.confirmSelect();
                emitter.success((Object)channel);
            }
            catch (IOException e) {
                emitter.error((Throwable)new MessagingClientException("Failed to enable publisher confirms on the channel", (Throwable)e));
            }
        });
    }

    protected void returnChannel(Channel channel) {
        this.channelPool.returnChannel(channel);
    }

    protected Disposable createListener(Channel channel, final MonoSink<Object> emitter, final RabbitPublishState publishState) {
        final AtomicBoolean disposed = new AtomicBoolean();
        final Consumer<ConfirmListener> dispose = listener -> {
            if (disposed.compareAndSet(false, true)) {
                channel.removeConfirmListener(listener);
            }
        };
        final ConfirmListener confirmListener = new ConfirmListener(){

            public void handleAck(long deliveryTag, boolean multiple) {
                this.ackNack(deliveryTag, multiple, true);
            }

            public void handleNack(long deliveryTag, boolean multiple) {
                this.ackNack(deliveryTag, multiple, false);
            }

            private void ackNack(long deliveryTag, boolean multiple, boolean ack) {
                if (ack) {
                    emitter.success();
                } else {
                    emitter.error((Throwable)((Object)new RabbitClientException("Message could not be delivered to the broker", Collections.singletonList(publishState))));
                }
                dispose.accept(this);
            }
        };
        channel.addConfirmListener(confirmListener);
        return new Disposable(){

            public void dispose() {
                dispose.accept(confirmListener);
            }

            public boolean isDisposed() {
                return disposed.get();
            }
        };
    }

    protected Disposable createConsumer(final Channel channel, final RabbitPublishState publishState, final String correlationId, final MonoSink<RabbitConsumerState> emitter) throws IOException {
        final String replyTo = publishState.getProperties().getReplyTo();
        final AtomicBoolean disposed = new AtomicBoolean();
        final Consumer<String> dispose = consumerTag -> {
            if (disposed.compareAndSet(false, true)) {
                try {
                    channel.basicCancel(consumerTag);
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        };
        DefaultConsumer consumer = new DefaultConsumer(){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (replyTo.equals("amq.rabbitmq.reply-to") || correlationId.equals(properties.getCorrelationId())) {
                    dispose.accept(consumerTag);
                    emitter.success((Object)new RabbitConsumerState(envelope, properties, body, channel));
                }
            }

            @Override
            public void handleCancel(String consumerTag) throws IOException {
                this.handleError(consumerTag);
            }

            @Override
            public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                this.handleError(consumerTag);
            }

            private void handleError(String consumerTag) {
                dispose.accept(consumerTag);
                emitter.error((Throwable)((Object)new RabbitClientException("Message was not able to be received from the reply to queue. The consumer was cancelled", Collections.singletonList(publishState))));
            }
        };
        final String consumerTag2 = channel.basicConsume(replyTo, true, (com.rabbitmq.client.Consumer)consumer);
        return new Disposable(){

            public void dispose() {
                dispose.accept(consumerTag2);
            }

            public boolean isDisposed() {
                return disposed.get();
            }
        };
    }
}

