/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.Delivery;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ResourcesSpecification;

public class ApplicationReplyListener {
    @Generated
    private static final Logger log = Logger.getLogger(ApplicationReplyListener.class.getName());
    private final ReactiveReplyRouter router;
    private final Receiver receiver;
    private final TopologyCreator creator;
    private final String queueName;
    private final String exchangeName;
    private final boolean createTopology;
    private volatile Flux<Delivery> deliveryFlux;

    public ApplicationReplyListener(ReactiveReplyRouter router, ReactiveMessageListener listener, String queueName, String exchangeName, boolean createTopology) {
        this.router = router;
        this.queueName = queueName;
        this.exchangeName = exchangeName;
        this.receiver = listener.getReceiver();
        this.creator = listener.getTopologyCreator();
        this.createTopology = createTopology;
    }

    public void startListening(String routeKey) {
        Mono flow = Mono.empty();
        if (this.createTopology) {
            flow = this.creator.declare(ResourcesSpecification.exchange((String)this.exchangeName).type("topic").durable(true)).then();
        }
        this.deliveryFlux = flow.then(this.creator.declare(ResourcesSpecification.queue((String)this.queueName).durable(false).autoDelete(true).exclusive(true))).then(this.creator.bind(ResourcesSpecification.binding((String)this.exchangeName, (String)routeKey, (String)this.queueName))).thenMany((Publisher)this.receiver.consumeAutoAck(this.queueName).doOnNext(delivery -> {
            try {
                boolean isEmpty;
                String correlationID = delivery.getProperties().getHeaders().get("x-correlation-id").toString();
                boolean bl = isEmpty = delivery.getProperties().getHeaders().get("x-empty-completion") != null;
                if (isEmpty) {
                    this.router.routeEmpty(correlationID);
                } else {
                    this.router.routeReply(correlationID, (Message)RabbitMessage.fromDelivery(delivery));
                }
            }
            catch (Exception e) {
                log.log(Level.SEVERE, "Error in reply reception", e);
            }
        }));
        this.onTerminate();
    }

    private void onTerminate() {
        this.deliveryFlux.doOnTerminate(this::onTerminate).subscribe((CoreSubscriber)new LoggerSubscriber(this.getClass().getName()));
    }
}

