/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.rabbit;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventAttributes;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.api.From;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessageResult;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class RabbitDirectAsyncGateway
implements DirectAsyncGateway {
    private final BrokerConfig config;
    private final ReactiveReplyRouter router;
    private final ReactiveMessageSender sender;
    private final String exchange;
    private final MessageConverter converter;
    private final boolean persistentCommands;
    private final boolean persistentQueries;
    private final Duration replyTimeout;
    private final MeterRegistry meterRegistry;

    public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender sender, String exchange, MessageConverter converter, MeterRegistry meterRegistry) {
        this.config = config;
        this.router = router;
        this.sender = sender;
        this.exchange = exchange;
        this.converter = converter;
        this.persistentCommands = config.isPersistentCommands();
        this.persistentQueries = config.isPersistentQueries();
        this.replyTimeout = config.getReplyTimeout();
        this.meterRegistry = meterRegistry;
    }

    public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
        return this.sendCommand(command, targetName, 0L, "app");
    }

    public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis) {
        return this.sendCommand(command, targetName, delayMillis, "app");
    }

    public <T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain) {
        return this.sendCommand(command, targetName, 0L, domain);
    }

    public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain) {
        Tuple2<String, Map<String, Object>> targetAndHeaders = this.validateDelay(targetName, delayMillis);
        return this.resolveSender(domain).sendWithConfirm(command, this.exchange, (String)targetAndHeaders.getT1(), (Map)targetAndHeaders.getT2(), this.persistentCommands);
    }

    public Mono<Void> sendCommand(CloudEvent command, String targetName) {
        return this.sendCommand(command, targetName, 0L, "app");
    }

    public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis) {
        return this.sendCommand(command, targetName, delayMillis, "app");
    }

    public Mono<Void> sendCommand(CloudEvent command, String targetName, String domain) {
        return this.sendCommand(command, targetName, 0L, domain);
    }

    public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis, String domain) {
        Tuple2<String, Map<String, Object>> targetAndHeaders = this.validateDelay(targetName, delayMillis);
        return this.resolveSender(domain).sendWithConfirm(command, this.exchange, (String)targetAndHeaders.getT1(), (Map)targetAndHeaders.getT2(), this.persistentCommands);
    }

    public <T> Flux<OutboundMessageResult> sendCommands(Flux<Command<T>> commands, String targetName) {
        return this.sender.sendWithConfirmBatch(commands, this.exchange, targetName, Collections.emptyMap(), this.persistentCommands);
    }

    public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type) {
        return this.requestReply(query, targetName, type, "app");
    }

    public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type, String domain) {
        return this.requestReplyInternal(query, targetName, type, domain, AsyncQuery::getResource);
    }

    public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
        return this.requestReplyInternal(query, targetName, type, "app", CloudEventAttributes::getType);
    }

    public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type, String domain) {
        return this.requestReplyInternal(query, targetName, type, domain, CloudEventAttributes::getType);
    }

    private <T, R> Mono<R> requestReplyInternal(T query, String targetName, Class<R> type, String domain, Function<T, String> queryTypeExtractor) {
        String correlationID = UUID.randomUUID().toString().replaceAll("-", "");
        Mono replyHolder = this.router.register(correlationID).timeout(this.replyTimeout).doOnError(TimeoutException.class, e -> this.router.deregister(correlationID)).map(s -> this.converter.readValue(s, type));
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-reply_id", this.config.getRoutingKey());
        headers.put("x-serveQuery-id", queryTypeExtractor.apply(query));
        headers.put("x-correlation-id", correlationID);
        headers.put("x-reply-timeout-millis", this.replyTimeout.toMillis());
        return this.resolveSender(domain).sendNoConfirm(query, this.exchange, targetName + ".query", headers, this.persistentQueries).then(replyHolder).name("async_query").tag("operation", queryTypeExtractor.apply(query)).tag("target", targetName).tap(Micrometer.metrics((MeterRegistry)this.meterRegistry));
    }

    public <T> Mono<Void> reply(T response, From from) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-correlation-id", from.getCorrelationID());
        if (response == null) {
            headers.put("x-empty-completion", Boolean.TRUE.toString());
        }
        return this.sender.sendNoConfirm(response, "globalReply", from.getReplyID(), headers, false);
    }

    protected ReactiveMessageSender resolveSender(String domain) {
        return this.sender;
    }

    private Tuple2<String, Map<String, Object>> validateDelay(String targetName, long delayMillis) {
        HashMap<String, String> headers = new HashMap<String, String>();
        Object realTarget = targetName;
        if (delayMillis > 0L) {
            headers.put("rc-delay", String.valueOf(delayMillis));
            realTarget = targetName + "-delayed";
        }
        return Tuples.of((Object)realTarget, headers);
    }
}

