/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.AMQP;
import java.util.Optional;
import java.util.function.Function;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.api.handlers.CloudEventHandler;
import org.reactivecommons.async.api.handlers.DomainEventHandler;
import org.reactivecommons.async.api.handlers.RawEventHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.EventExecutor;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import org.reactivecommons.async.rabbit.listeners.GenericMessageListener;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;

public class ApplicationEventListener
extends GenericMessageListener {
    @Generated
    private static final Logger log = Logger.getLogger(ApplicationEventListener.class.getName());
    private final MessageConverter messageConverter;
    private final HandlerResolver resolver;
    private final String eventsExchange;
    private final boolean withDLQRetry;
    private final int retryDelay;
    private final Optional<Integer> maxLengthBytes;
    private final String appName;

    public ApplicationEventListener(ReactiveMessageListener receiver, String queueName, String eventsExchange, HandlerResolver resolver, MessageConverter messageConverter, boolean withDLQRetry, boolean createTopology, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter, String appName) {
        super(queueName, receiver, withDLQRetry, createTopology, maxRetries, retryDelay, discardNotifier, "event", errorReporter);
        this.retryDelay = retryDelay;
        this.withDLQRetry = withDLQRetry;
        this.resolver = resolver;
        this.eventsExchange = eventsExchange;
        this.messageConverter = messageConverter;
        this.maxLengthBytes = maxLengthBytes;
        this.appName = appName;
    }

    @Override
    protected Mono<Void> setUpBindings(TopologyCreator creator) {
        Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange((String)this.eventsExchange).durable(true).type("topic"));
        Flux bindings = Flux.fromIterable((Iterable)this.resolver.getEventListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding((String)this.eventsExchange, (String)listener.getPath(), (String)this.queueName)));
        if (this.withDLQRetry) {
            String eventsDLQExchangeName = String.format("%s.%s.DLQ", this.appName, this.eventsExchange);
            String retryExchangeName = String.format("%s.%s", this.appName, this.eventsExchange);
            Mono<AMQP.Exchange.DeclareOk> retryExchange = creator.declare(ExchangeSpecification.exchange((String)retryExchangeName).durable(true).type("topic"));
            Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange((String)eventsDLQExchangeName).durable(true).type("topic"));
            Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(this.queueName, retryExchangeName, this.retryDelay, this.maxLengthBytes);
            Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(this.queueName, eventsDLQExchangeName, this.maxLengthBytes);
            Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding((String)eventsDLQExchangeName, (String)"#", (String)(this.queueName + ".DLQ")));
            Mono<AMQP.Queue.BindOk> retryBinding = creator.bind(BindingSpecification.binding((String)retryExchangeName, (String)"#", (String)this.queueName));
            return declareExchange.then(retryExchange).then(declareExchangeDLQ).then(declareQueue).then(declareDLQ).thenMany((Publisher)bindings).then(bindingDLQ).then(retryBinding).then();
        }
        Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(this.queueName, this.maxLengthBytes);
        return declareExchange.then(declareQueue).thenMany((Publisher)bindings).then();
    }

    @Override
    protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
        RegisteredEventListener handler = this.resolver.getEventListener(executorPath);
        Function<Message, Object> converter = this.resolveConverter(handler);
        EventExecutor executor = new EventExecutor(handler.getHandler(), converter);
        return msj -> executor.execute(msj).cast(Object.class);
    }

    @Override
    protected String getExecutorPath(AcknowledgableDelivery msj) {
        return msj.getEnvelope().getRoutingKey();
    }

    @Override
    protected Object parseMessageForReporter(Message msj) {
        return this.messageConverter.readDomainEventStructure(msj);
    }

    private <T, D> Function<Message, Object> resolveConverter(RegisteredEventListener<T, D> registeredEventListener) {
        if (registeredEventListener.getHandler() instanceof DomainEventHandler) {
            Class eventClass = registeredEventListener.getInputClass();
            return msj -> this.messageConverter.readDomainEvent(msj, eventClass);
        }
        if (registeredEventListener.getHandler() instanceof CloudEventHandler) {
            return arg_0 -> ((MessageConverter)this.messageConverter).readCloudEvent(arg_0);
        }
        if (registeredEventListener.getHandler() instanceof RawEventHandler) {
            return message -> message;
        }
        throw new RuntimeException("Unknown handler type");
    }
}

