/*
 * Decompiled with CFR 0.152.
 */
package org.iris_events.producer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ReturnCallback;
import com.rabbitmq.client.ReturnListener;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.transaction.RollbackException;
import jakarta.transaction.Synchronization;
import jakarta.transaction.SystemException;
import jakarta.transaction.Transaction;
import jakarta.transaction.TransactionManager;
import jakarta.validation.constraints.NotNull;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.iris_events.annotations.CachedMessage;
import org.iris_events.annotations.ExchangeType;
import org.iris_events.annotations.Scope;
import org.iris_events.asyncapi.parsers.CacheableTtlParser;
import org.iris_events.asyncapi.parsers.ExchangeParser;
import org.iris_events.asyncapi.parsers.MessageScopeParser;
import org.iris_events.asyncapi.parsers.PersistentParser;
import org.iris_events.asyncapi.parsers.RoutingKeyParser;
import org.iris_events.asyncapi.parsers.RpcResponseClassParser;
import org.iris_events.common.Exchanges;
import org.iris_events.common.message.ResourceMessage;
import org.iris_events.context.EventContext;
import org.iris_events.exception.IrisSendException;
import org.iris_events.exception.IrisTransactionException;
import org.iris_events.producer.Message;
import org.iris_events.producer.RoutingDetails;
import org.iris_events.producer.SendMessageValidator;
import org.iris_events.routing.RoutingDetailsProvider;
import org.iris_events.runtime.AnnotationValueExtractor;
import org.iris_events.runtime.BasicPropertiesProvider;
import org.iris_events.runtime.ExchangeNameProvider;
import org.iris_events.runtime.QueueNameProvider;
import org.iris_events.runtime.RpcMappingProvider;
import org.iris_events.runtime.channel.ChannelKey;
import org.iris_events.runtime.channel.ChannelService;
import org.iris_events.runtime.configuration.IrisConfig;
import org.iris_events.tx.TransactionCallback;
import org.jboss.jandex.DotName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class EventProducer {
    private static final Logger log = LoggerFactory.getLogger(EventProducer.class);
    public static final String SERVICE_ID_UNAVAILABLE_FALLBACK = "N/A";
    private static final long WAIT_TIMEOUT_MILLIS = 2000L;
    private static final String RESOURCE = "resource";
    private final ChannelService channelService;
    private final ObjectMapper objectMapper;
    private final EventContext eventContext;
    private final IrisConfig config;
    private final TransactionManager transactionManager;
    private final BasicPropertiesProvider basicPropertiesProvider;
    private final QueueNameProvider queueNameProvider;
    private final ExchangeNameProvider exchangeNameProvider;
    private final RpcMappingProvider rpcMappingProvider;
    private final RoutingDetailsProvider routingDetailsProvider;
    private final AtomicInteger count = new AtomicInteger(0);
    private final Object lock = new Object();
    private final Map<Transaction, List<Message>> transactionDelayedMessages = new ConcurrentHashMap<Transaction, List<Message>>();
    private TransactionCallback transactionCallback;

    @Inject
    public EventProducer(@Named(value="producerChannelService") ChannelService channelService, ObjectMapper objectMapper, EventContext eventContext, IrisConfig config, TransactionManager transactionManager, BasicPropertiesProvider basicPropertiesProvider, ExchangeNameProvider exchangeNameProvider, QueueNameProvider queueNameProvider, RpcMappingProvider rpcMappingProvider, RoutingDetailsProvider routingDetailsProvider) {
        this.channelService = channelService;
        this.objectMapper = objectMapper;
        this.eventContext = eventContext;
        this.config = config;
        this.transactionManager = transactionManager;
        this.basicPropertiesProvider = basicPropertiesProvider;
        this.exchangeNameProvider = exchangeNameProvider;
        this.queueNameProvider = queueNameProvider;
        this.rpcMappingProvider = rpcMappingProvider;
        this.routingDetailsProvider = routingDetailsProvider;
    }

    public void send(Object message) throws IrisSendException, IrisTransactionException {
        this.doSend(message, null, true);
    }

    public void send(Object message, String userId) throws IrisSendException, IrisTransactionException {
        this.doSend(message, userId, true);
    }

    public void send(Object message, boolean propagate) {
        this.doSend(message, null, propagate);
    }

    public void send(Object message, String userId, boolean propagate) {
        this.doSend(message, userId, propagate);
    }

    public <T> T sendAndReceive(Object message, Class<T> responseType) {
        return this.sendAndReceive(message, responseType, this.config.getRpcTimeout());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public <T> T sendAndReceive(Object message, Class<T> responseType, int timeout) {
        UUID messageId = UUID.randomUUID();
        org.iris_events.annotations.Message messageAnnotation = AnnotationValueExtractor.getMessageAnnotation(message);
        String eventName = ExchangeParser.getFromAnnotationClass((org.iris_events.annotations.Message)messageAnnotation);
        DotName replyToType = RpcResponseClassParser.getFromAnnotationClass((org.iris_events.annotations.Message)messageAnnotation).name();
        String replyTo = this.rpcMappingProvider.getReplyTo(replyToType);
        if (replyTo == null) {
            throw new IrisSendException("Can not send RPC request message with missing replyTo parameter.");
        }
        try (Channel channel = this.channelService.createChannel();){
            String requestExchange = this.exchangeNameProvider.getRpcRequestExchangeName(eventName);
            String responseExchange = this.exchangeNameProvider.getRpcResponseExchangeName(replyTo);
            String requestQueueName = this.queueNameProvider.getRpcRequestQueueName(eventName);
            String responseQueue = this.queueNameProvider.getRpcResponseQueueName(replyTo);
            channel.queueDeclare(responseQueue, false, false, true, null);
            channel.queueBind(responseQueue, responseExchange, responseQueue);
            CompletableFuture response = new CompletableFuture();
            String ctag = channel.basicConsume(responseQueue, false, (consumerTag, delivery) -> {
                Object messageObject = this.objectMapper.readValue(delivery.getBody(), responseType);
                String answerMessageId = delivery.getProperties().getMessageId();
                if (messageId.toString().equals(answerMessageId)) {
                    response.complete(messageObject);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } else {
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                }
            }, consumerTag -> {});
            channel.basicPublish(requestExchange, requestQueueName, new AMQP.BasicProperties.Builder().messageId(messageId.toString()).replyTo(responseQueue).build(), this.objectMapper.writeValueAsBytes(message));
            Object result = response.get(timeout, TimeUnit.MILLISECONDS);
            channel.basicCancel(ctag);
            Object t = result;
            return t;
        }
        catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
            throw new IrisSendException("Could not process rpc send", (Throwable)e);
        }
    }

    public void sendToSubscription(Object message, String resourceType, String resourceId) throws IrisSendException, IrisTransactionException {
        org.iris_events.annotations.Message messageAnnotation = AnnotationValueExtractor.getMessageAnnotation(message);
        if (resourceType == null || resourceType.isBlank()) {
            throw new IrisSendException("Resource type is required for subscription event!");
        }
        String eventName = ExchangeParser.getFromAnnotationClass((org.iris_events.annotations.Message)messageAnnotation);
        String routingKey = String.format("%s.%s", eventName, RESOURCE);
        ResourceMessage resourceUpdate = new ResourceMessage(resourceType, resourceId, message);
        boolean persistent = PersistentParser.getFromAnnotationClass((org.iris_events.annotations.Message)messageAnnotation);
        Integer cacheTtl = this.getCachedAnnotation(message).map(CacheableTtlParser::getFromAnnotationClass).orElse(null);
        RoutingDetails routingDetails = new RoutingDetails.Builder().eventName(eventName).exchange(Exchanges.SUBSCRIPTION.getValue()).exchangeType(ExchangeType.TOPIC).routingKey(routingKey).scope(null).persistent(persistent).cacheTtl(cacheTtl).build();
        this.publish(resourceUpdate, routingDetails);
    }

    private void doSend(Object message, String userId, boolean propagate) throws IrisSendException {
        org.iris_events.annotations.Message messageAnnotation = AnnotationValueExtractor.getMessageAnnotation(message);
        Scope scope = MessageScopeParser.getFromAnnotationClass((org.iris_events.annotations.Message)messageAnnotation);
        switch (scope) {
            case INTERNAL: {
                this.publish(message, this.routingDetailsProvider.getRoutingDetailsFromAnnotation(messageAnnotation, scope, userId, propagate));
                break;
            }
            case USER: 
            case SESSION: 
            case BROADCAST: {
                this.publish(message, this.routingDetailsProvider.getRoutingDetailsForClientScope(messageAnnotation, scope, userId));
                break;
            }
            default: {
                throw new IrisSendException("Message scope " + String.valueOf(scope) + " not supported!");
            }
        }
    }

    private Optional<CachedMessage> getCachedAnnotation(Object message) {
        if (message == null) {
            return Optional.empty();
        }
        return Optional.ofNullable(message.getClass().getAnnotation(CachedMessage.class));
    }

    public void addReturnListener(@NotNull String channelKey, @NotNull ReturnListener returnListener) throws IOException {
        Objects.requireNonNull(returnListener, "Return listener can not be null");
        Channel channel = this.channelService.getOrCreateChannelById(channelKey);
        channel.clearReturnListeners();
        channel.addReturnListener(returnListener);
    }

    public void addReturnCallback(@NotNull String channelKey, @NotNull ReturnCallback returnCallback) throws IOException {
        Objects.requireNonNull(returnCallback, "Return callback can not be null");
        Channel channel = this.channelService.getOrCreateChannelById(channelKey);
        channel.clearReturnListeners();
        channel.addReturnListener(returnCallback);
    }

    public void addConfirmListener(@NotNull String channelKey, @NotNull ConfirmListener confirmListener) throws IOException {
        Objects.requireNonNull(confirmListener, "Confirm listener can not be null");
        Channel channel = this.channelService.getOrCreateChannelById(channelKey);
        channel.clearConfirmListeners();
        channel.addConfirmListener(confirmListener);
    }

    public void registerTransactionCallback(TransactionCallback callback) {
        this.transactionCallback = callback;
    }

    public void publish(@NotNull Object message, RoutingDetails routingDetails) throws IrisSendException {
        SendMessageValidator.validate(routingDetails);
        Optional<Transaction> txOptional = this.getOptionalTransaction();
        if (txOptional.isPresent()) {
            Transaction tx = txOptional.get();
            this.enqueueDelayedMessage(message, routingDetails, tx);
            this.registerDefaultTransactionCallback(tx);
        } else {
            this.executePublish(message, routingDetails);
        }
    }

    private void enqueueDelayedMessage(Object message, RoutingDetails routingDetails, Transaction tx) {
        AMQP.BasicProperties properties = this.basicPropertiesProvider.getOrCreateAmqpBasicProperties(routingDetails);
        this.transactionDelayedMessages.computeIfAbsent(tx, k -> new LinkedList()).add(new Message(message, routingDetails, properties, this.eventContext.getEnvelope()));
    }

    private Optional<Transaction> getOptionalTransaction() throws IrisTransactionException {
        try {
            return Optional.ofNullable(this.transactionManager.getTransaction());
        }
        catch (SystemException e) {
            throw new IrisTransactionException("Exception retrieving transaction from transaction manager", (Throwable)e);
        }
    }

    private void registerDefaultTransactionCallback(Transaction tx) throws IrisSendException {
        try {
            tx.registerSynchronization((Synchronization)new ProducerSynchronization(tx));
        }
        catch (RollbackException | SystemException e) {
            throw new IrisSendException("Exception registering transaction callback", e);
        }
    }

    private void executeTxPublish(Transaction transaction) throws IOException, IrisSendException {
        Message message;
        LinkedList messageList = (LinkedList)this.transactionDelayedMessages.get(transaction);
        Message message2 = message = messageList != null ? (Message)messageList.poll() : null;
        while (message != null) {
            Envelope envelope = message.envelope();
            AMQP.BasicProperties properties = message.properties();
            RoutingDetails routingDetails = message.routingDetails();
            this.eventContext.setEnvelope(envelope);
            this.eventContext.setBasicProperties(properties);
            this.executePublish(message.message(), routingDetails);
            message = (Message)messageList.poll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executePublish(Object message, RoutingDetails routingDetails) throws IrisSendException {
        String exchange = routingDetails.getExchange();
        String routingKey = routingDetails.getRoutingKey();
        try {
            byte[] bytes = this.objectMapper.writeValueAsBytes(message);
            Object object = this.lock;
            synchronized (object) {
                AMQP.BasicProperties properties = this.basicPropertiesProvider.getOrCreateAmqpBasicProperties(routingDetails);
                String channelKey = ChannelKey.create(exchange, routingKey);
                Channel channel = this.channelService.getOrCreateChannelById(channelKey);
                if (log.isTraceEnabled()) {
                    log.trace("publishing event to exchange: {}, routing key: {}, props: {}", new Object[]{exchange, routingKey, properties});
                }
                channel.basicPublish(exchange, routingKey, true, properties, bytes);
                if (this.shouldWaitForConfirmations()) {
                    this.waitForConfirmations(channel);
                }
            }
        }
        catch (IOException e) {
            throw new IrisSendException("Exception executing publish.", (Throwable)e);
        }
    }

    private void waitForConfirmations(Channel channel) throws IrisSendException {
        try {
            channel.waitForConfirms(2000L);
        }
        catch (InterruptedException | TimeoutException e) {
            throw new IrisSendException("Exception waiting for confirmations.", (Throwable)e);
        }
        finally {
            this.count.set(0);
        }
    }

    private String getRoutingKey(org.iris_events.annotations.Message messageAnnotation, ExchangeType exchangeType) {
        if (exchangeType == ExchangeType.FANOUT) {
            return "";
        }
        return RoutingKeyParser.getFromAnnotationClass((org.iris_events.annotations.Message)messageAnnotation);
    }

    private boolean shouldWaitForConfirmations() {
        return this.config.getConfirmationBatchSize() > 0L && (long)this.count.incrementAndGet() == this.config.getConfirmationBatchSize();
    }

    private class ProducerSynchronization
    implements Synchronization {
        private final Transaction tx;

        public ProducerSynchronization(Transaction tx) {
            this.tx = tx;
        }

        public void beforeCompletion() {
            boolean isCallbackPresent;
            boolean bl = isCallbackPresent = EventProducer.this.transactionCallback != null;
            if (isCallbackPresent) {
                EventProducer.this.transactionCallback.beforeCompletion(EventProducer.this.transactionDelayedMessages.get(this.tx));
            }
        }

        public void afterCompletion(int status) {
            boolean messagesPublished = false;
            try {
                if (status == 3) {
                    EventProducer.this.executeTxPublish(this.tx);
                    messagesPublished = true;
                }
            }
            catch (IOException | IrisSendException e) {
                log.error("Exception completing send transaction.", e);
                throw new IrisTransactionException("Exception completing send transaction");
            }
            finally {
                if (EventProducer.this.transactionCallback != null) {
                    EventProducer.this.transactionCallback.afterCompletion(EventProducer.this.transactionDelayedMessages.get(this.tx), status, messagesPublished);
                }
                EventProducer.this.transactionDelayedMessages.remove(this.tx);
            }
        }
    }
}

