/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.rabbitmq.reactive;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.core.annotation.Internal;
import io.micronaut.messaging.exceptions.MessagingClientException;
import io.micronaut.rabbitmq.bind.RabbitConsumerState;
import io.micronaut.rabbitmq.connect.ChannelPool;
import io.micronaut.rabbitmq.connect.RabbitConnectionFactoryConfig;
import io.micronaut.rabbitmq.exception.RabbitClientException;
import io.micronaut.rabbitmq.intercept.DefaultConsumer;
import io.micronaut.rabbitmq.reactive.RabbitPublishState;
import io.micronaut.rabbitmq.reactive.ReactivePublisher;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableSource;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

@Internal
@EachBean(value=ChannelPool.class)
public class RxJavaReactivePublisher
implements ReactivePublisher {
    private final ChannelPool channelPool;
    private final RabbitConnectionFactoryConfig config;

    public RxJavaReactivePublisher(@Parameter ChannelPool channelPool, @Parameter RabbitConnectionFactoryConfig config) {
        this.channelPool = channelPool;
        this.config = config;
    }

    public Flowable<Void> publishAndConfirm(RabbitPublishState publishState) {
        return this.getChannel().flatMap(this::initializePublish).flatMapCompletable(channel -> this.publishInternal((Channel)channel, publishState)).timeout(this.config.getConfirmTimeout().toMillis(), TimeUnit.MILLISECONDS, (CompletableSource)Completable.error((Throwable)((Object)new RabbitClientException(String.format("Timed out waiting for publisher confirm for exchange: [%s] and routing key: [%s]", publishState.getExchange(), publishState.getRoutingKey()))))).toFlowable();
    }

    public Flowable<Void> publish(RabbitPublishState publishState) {
        return this.getChannel().flatMapCompletable(channel -> this.publishInternalNoConfirm((Channel)channel, publishState)).toFlowable();
    }

    public Flowable<RabbitConsumerState> publishAndReply(RabbitPublishState publishState) {
        Flowable flowable = this.getChannel().flatMap(channel -> this.publishRpcInternal((Channel)channel, publishState)).toFlowable();
        Optional<Duration> optionalDuration = this.config.getRpc().getTimeout();
        if (optionalDuration.isPresent()) {
            long nanos = optionalDuration.get().toNanos();
            flowable = flowable.timeout(nanos, TimeUnit.NANOSECONDS);
        }
        return flowable;
    }

    protected Single<Channel> getChannel() {
        return Single.create(emitter -> {
            try {
                Channel channel = this.channelPool.getChannel();
                emitter.onSuccess((Object)channel);
            }
            catch (IOException e) {
                emitter.onError((Throwable)((Object)new RabbitClientException("Failed to retrieve a channel from the pool", e)));
            }
        });
    }

    protected Completable publishInternal(Channel channel, RabbitPublishState publishState) {
        return Completable.create(subscriber -> {
            Disposable listener = this.createListener(channel, subscriber, publishState);
            try {
                channel.basicPublish(publishState.getExchange(), publishState.getRoutingKey(), publishState.getProperties(), publishState.getBody());
            }
            catch (IOException e) {
                listener.dispose();
                subscriber.onError((Throwable)e);
            }
        }).doFinally(() -> this.returnChannel(channel));
    }

    protected Single<RabbitConsumerState> publishRpcInternal(Channel channel, RabbitPublishState publishState) {
        return Single.create(subscriber -> {
            Disposable listener = null;
            try {
                String correlationId = UUID.randomUUID().toString();
                AMQP.BasicProperties properties = publishState.getProperties().builder().correlationId(correlationId).build();
                listener = this.createConsumer(channel, publishState, correlationId, (SingleEmitter<RabbitConsumerState>)subscriber);
                channel.basicPublish(publishState.getExchange(), publishState.getRoutingKey(), properties, publishState.getBody());
            }
            catch (IOException e) {
                if (listener != null) {
                    listener.dispose();
                }
                subscriber.onError((Throwable)new MessagingClientException("Failed to publish the message", (Throwable)e));
            }
        }).doFinally(() -> this.returnChannel(channel));
    }

    protected Completable publishInternalNoConfirm(Channel channel, RabbitPublishState publishState) {
        return Completable.create(subscriber -> {
            try {
                channel.basicPublish(publishState.getExchange(), publishState.getRoutingKey(), publishState.getProperties(), publishState.getBody());
                subscriber.onComplete();
            }
            catch (IOException e) {
                subscriber.onError((Throwable)new MessagingClientException("Failed to publish the message", (Throwable)e));
            }
        }).doFinally(() -> this.returnChannel(channel));
    }

    protected Single<Channel> initializePublish(Channel channel) {
        return Single.create(emitter -> {
            try {
                channel.confirmSelect();
                emitter.onSuccess((Object)channel);
            }
            catch (IOException e) {
                emitter.onError((Throwable)new MessagingClientException("Failed to enable publisher confirms on the channel", (Throwable)e));
            }
        });
    }

    protected void returnChannel(Channel channel) {
        this.channelPool.returnChannel(channel);
    }

    protected Disposable createListener(Channel channel, final CompletableEmitter emitter, final RabbitPublishState publishState) {
        final AtomicBoolean disposed = new AtomicBoolean();
        final Consumer<ConfirmListener> dispose = listener -> {
            if (disposed.compareAndSet(false, true)) {
                channel.removeConfirmListener(listener);
            }
        };
        final ConfirmListener confirmListener = new ConfirmListener(){

            public void handleAck(long deliveryTag, boolean multiple) {
                this.ackNack(deliveryTag, multiple, true);
            }

            public void handleNack(long deliveryTag, boolean multiple) {
                this.ackNack(deliveryTag, multiple, false);
            }

            private void ackNack(long deliveryTag, boolean multiple, boolean ack) {
                if (ack) {
                    emitter.onComplete();
                } else {
                    emitter.onError((Throwable)((Object)new RabbitClientException("Message could not be delivered to the broker", Collections.singletonList(publishState))));
                }
                dispose.accept(this);
            }
        };
        channel.addConfirmListener(confirmListener);
        return new Disposable(){

            public void dispose() {
                dispose.accept(confirmListener);
            }

            public boolean isDisposed() {
                return disposed.get();
            }
        };
    }

    protected Disposable createConsumer(final Channel channel, final RabbitPublishState publishState, final String correlationId, final SingleEmitter<RabbitConsumerState> emitter) throws IOException {
        final String replyTo = publishState.getProperties().getReplyTo();
        final AtomicBoolean disposed = new AtomicBoolean();
        final Consumer<String> dispose = consumerTag -> {
            if (disposed.compareAndSet(false, true)) {
                try {
                    channel.basicCancel(consumerTag);
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        };
        DefaultConsumer consumer = new DefaultConsumer(){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (replyTo.equals("amq.rabbitmq.reply-to") || correlationId.equals(properties.getCorrelationId())) {
                    dispose.accept(consumerTag);
                    emitter.onSuccess((Object)new RabbitConsumerState(envelope, properties, body, channel));
                }
            }

            @Override
            public void handleCancel(String consumerTag) throws IOException {
                this.handleError(consumerTag);
            }

            @Override
            public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                this.handleError(consumerTag);
            }

            private void handleError(String consumerTag) {
                dispose.accept(consumerTag);
                emitter.onError((Throwable)((Object)new RabbitClientException("Message was not able to be received from the reply to queue. The consumer was cancelled", Collections.singletonList(publishState))));
            }
        };
        final String consumerTag2 = channel.basicConsume(replyTo, true, (com.rabbitmq.client.Consumer)consumer);
        return new Disposable(){

            public void dispose() {
                dispose.accept(consumerTag2);
            }

            public boolean isDisposed() {
                return disposed.get();
            }
        };
    }
}

