/*
 * Decompiled with CFR 0.152.
 */
package com.mware.core.model.workQueue;

import com.google.inject.Inject;
import com.mware.core.config.Configuration;
import com.mware.core.exception.BcException;
import com.mware.core.lifecycle.LifeSupportService;
import com.mware.core.model.workQueue.RabbitMQUtils;
import com.mware.core.model.workQueue.WebQueueRepository;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import org.json.JSONObject;

public class RabbitMQWebQueueRepository
extends WebQueueRepository {
    private static final BcLogger LOGGER = BcLoggerFactory.getLogger(RabbitMQWebQueueRepository.class);
    public static final String CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT = "exBroadcast";
    private final Connection connection;
    private final Channel channel;
    private Thread thread;

    @Inject
    public RabbitMQWebQueueRepository(Configuration configuration, LifeSupportService lifeSupportService) throws IOException {
        this.connection = RabbitMQUtils.openConnection(configuration);
        this.channel = RabbitMQUtils.openChannel(this.connection);
        this.channel.exchangeDeclare(CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, "fanout");
        lifeSupportService.add(this);
    }

    @Override
    public void broadcastJson(JSONObject json) {
        try {
            LOGGER.debug("publishing message to broadcast exchange [%s]: %s", CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, json.toString());
            this.channel.basicPublish(CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, "", null, json.toString().getBytes());
        }
        catch (IOException ex) {
            throw new BcException("Could not broadcast json", ex);
        }
    }

    @Override
    public void subscribeToBroadcastMessages(WebQueueRepository.BroadcastConsumer broadcastConsumer) {
        try {
            String queueName = this.channel.queueDeclare().getQueue();
            this.channel.queueBind(queueName, CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, "");
            QueueingConsumer callback = new QueueingConsumer(this.channel);
            this.channel.basicConsume(queueName, true, (Consumer)callback);
            this.thread = new Thread(() -> {
                block4: while (true) {
                    try {
                        while (true) {
                            QueueingConsumer.Delivery delivery = callback.nextDelivery();
                            try {
                                JSONObject json = new JSONObject(new String(delivery.getBody()));
                                LOGGER.debug("received message from broadcast exchange [%s]: %s", CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, json.toString());
                                broadcastConsumer.broadcastReceived(json);
                                continue block4;
                            }
                            catch (Throwable ex) {
                                LOGGER.error("problem in broadcast thread", ex);
                                continue;
                            }
                            break;
                        }
                    }
                    catch (InterruptedException e) {
                        throw new BcException("broadcast listener has died", e);
                    }
                }
            });
            this.thread.setName("rabbitmq-subscribe-" + broadcastConsumer.getClass().getName());
            this.thread.setDaemon(true);
            this.thread.start();
        }
        catch (IOException e) {
            throw new BcException("Could not subscribe to broadcasts", e);
        }
    }

    @Override
    public void unsubscribeFromBroadcastMessages(WebQueueRepository.BroadcastConsumer broadcastConsumer) {
        try {
            this.channel.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public void shutdown() {
        try {
            LOGGER.debug("Closing RabbitMQ channel", new Object[0]);
            this.channel.close();
        }
        catch (Throwable e) {
            LOGGER.error("Could not close RabbitMQ channel", e);
        }
        try {
            LOGGER.debug("Closing RabbitMQ connection", new Object[0]);
            this.connection.close();
        }
        catch (Throwable e) {
            LOGGER.error("Could not close RabbitMQ connection", e);
        }
        try {
            this.thread.interrupt();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

