/*
 * Decompiled with CFR 0.152.
 */
package org.iris_events.consumer;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerShutdownSignalCallback;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.iris_events.common.Exchanges;
import org.iris_events.consumer.DeliverCallbackProvider;
import org.iris_events.consumer.QueueDeclarator;
import org.iris_events.exception.IrisConnectionException;
import org.iris_events.runtime.InstanceInfoProvider;
import org.iris_events.runtime.QueueNameProvider;
import org.iris_events.runtime.channel.ChannelService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class FrontendEventConsumer
implements RecoveryListener {
    private static final Logger log = LoggerFactory.getLogger(FrontendEventConsumer.class);
    private static final int DEFAULT_MESSAGE_TTL = 15000;
    private final ChannelService channelService;
    private final InstanceInfoProvider instanceInfoProvider;
    private final QueueDeclarator queueDeclarator;
    private final ConcurrentHashMap<String, DeliverCallbackProvider> deliverCallbackProviderMap;
    private final ConcurrentHashMap<String, DeliverCallback> deliverCallbackMap;
    private final String queueName;
    private String channelId;

    @Inject
    public FrontendEventConsumer(@Named(value="consumerChannelService") ChannelService channelService, InstanceInfoProvider instanceInfoProvider, QueueDeclarator queueDeclarator, QueueNameProvider queueNameProvider) {
        this.channelService = channelService;
        this.instanceInfoProvider = instanceInfoProvider;
        this.queueDeclarator = queueDeclarator;
        this.deliverCallbackMap = new ConcurrentHashMap();
        this.deliverCallbackProviderMap = new ConcurrentHashMap();
        this.queueName = queueNameProvider.getFrontendQueueName();
        this.channelId = UUID.randomUUID().toString();
    }

    public void addDeliverCallbackProvider(String routingKey, DeliverCallbackProvider deliverCallbackProvider) {
        this.deliverCallbackProviderMap.put(routingKey, deliverCallbackProvider);
    }

    public void initChannel() {
        try {
            Channel channel = this.channelService.getOrCreateChannelById(this.channelId);
            String frontendQueue = this.queueName;
            HashMap<String, Object> args = new HashMap<String, Object>();
            args.put("x-message-ttl", 15000);
            QueueDeclarator.QueueDeclarationDetails details = new QueueDeclarator.QueueDeclarationDetails(frontendQueue, true, false, false, args);
            this.queueDeclarator.declareQueueWithRecreateOnConflict(channel, details);
            this.setupDeliverCallbacks(channel);
            channel.basicConsume(frontendQueue, false, this.getDeliverCallback(), this.getCancelCallback(), this.getShutdownCallback());
            if (channel instanceof Recoverable) {
                ((Recoverable)channel).addRecoveryListener((RecoveryListener)this);
            }
        }
        catch (IOException e) {
            String msg = "Could not initialize frontend consumer";
            log.error(msg, (Throwable)e);
            throw new IrisConnectionException(msg, (Throwable)e);
        }
    }

    private void setupDeliverCallbacks(Channel channel) {
        this.deliverCallbackProviderMap.forEach((routingKey, callbackProvider) -> {
            try {
                channel.queueBind(this.queueName, Exchanges.FRONTEND.getValue(), routingKey);
                this.deliverCallbackMap.put((String)routingKey, callbackProvider.createDeliverCallback(channel));
            }
            catch (IOException e) {
                String msg = String.format("Could not setup deliver callback for routing key = %s", routingKey);
                log.error(msg);
                throw new IrisConnectionException(msg, (Throwable)e);
            }
        });
    }

    private DeliverCallback getDeliverCallback() {
        return (consumerTag, message) -> {
            String msgRoutingKey = message.getEnvelope().getRoutingKey();
            DeliverCallback deliverCallback = this.deliverCallbackMap.get(msgRoutingKey);
            if (deliverCallback == null) {
                log.warn(String.format("No handler registered for frontend message with routingKey = %s, NACK-ing message", msgRoutingKey));
                this.channelService.getOrCreateChannelById(this.channelId).basicNack(message.getEnvelope().getDeliveryTag(), false, false);
            } else {
                deliverCallback.handle(consumerTag, message);
            }
        };
    }

    private CancelCallback getCancelCallback() {
        return consumerTag -> log.warn("Channel canceled for {}", (Object)(this.instanceInfoProvider.getApplicationName() + " frontend queue"));
    }

    private ConsumerShutdownSignalCallback getShutdownCallback() {
        return (consumerTag, sig) -> log.warn("Channel shut down for with signal:{}, queue: {}, consumer: {}", new Object[]{sig, this.queueName, consumerTag});
    }

    public void handleRecovery(Recoverable recoverable) {
        log.info("handleRecovery called for frontend consumer for queue {}", (Object)this.queueName);
    }

    public void handleRecoveryStarted(Recoverable recoverable) {
        log.info("handleRecoveryStarted called for frontend consumer for queue {}", (Object)this.queueName);
    }
}

