/*
 * Decompiled with CFR 0.152.
 */
package com.mware.core.model.workQueue;

import com.google.inject.Inject;
import com.mware.core.bootstrap.InjectHelper;
import com.mware.core.config.Configuration;
import com.mware.core.exception.BcException;
import com.mware.core.ingest.WorkerSpout;
import com.mware.core.lifecycle.LifeSupportService;
import com.mware.core.model.workQueue.Priority;
import com.mware.core.model.workQueue.RabbitMQUtils;
import com.mware.core.model.workQueue.RabbitMQWorkQueueSpout;
import com.mware.core.model.workQueue.WorkQueueRepository;
import com.mware.core.status.model.QueueStatus;
import com.mware.core.status.model.Status;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import com.mware.ge.Graph;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.json.JSONArray;
import org.json.JSONObject;

public class RabbitMQWorkQueueRepository
extends WorkQueueRepository {
    private static final BcLogger LOGGER = BcLoggerFactory.getLogger(RabbitMQWorkQueueRepository.class);
    private Connection connection;
    private Channel channel;
    private Integer deliveryMode;
    private Address[] rabbitMqAddresses;
    private Set<String> declaredQueues = new HashSet<String>();

    @Inject
    public RabbitMQWorkQueueRepository(Graph graph, Configuration configuration, LifeSupportService lifeSupportService) {
        super(graph, configuration);
        lifeSupportService.add(this);
    }

    @Override
    public void start() throws Throwable {
        this.connection = RabbitMQUtils.openConnection(this.getConfiguration());
        this.channel = RabbitMQUtils.openChannel(this.connection);
        this.deliveryMode = this.getConfiguration().getInt("rabbitmq.deliveryMode", MessageProperties.PERSISTENT_BASIC.getDeliveryMode());
        this.rabbitMqAddresses = RabbitMQUtils.getAddresses(this.getConfiguration());
    }

    @Override
    public void pushOnQueue(String queueName, byte[] data, Priority priority) {
        try {
            this.ensureQueue(queueName);
            AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
            if (this.deliveryMode != null) {
                propsBuilder.deliveryMode(this.deliveryMode);
            }
            LOGGER.debug("enqueuing message to queue [%s]: %s", queueName, new String(data));
            propsBuilder.priority(this.toRabbitMQPriority(priority));
            this.channel.basicPublish("", queueName, propsBuilder.build(), data);
        }
        catch (Exception ex) {
            throw new BcException("Could not push on queue", ex);
        }
    }

    private Integer toRabbitMQPriority(Priority priority) {
        switch (priority) {
            case HIGH: {
                return 2;
            }
            case NORMAL: {
                return 1;
            }
            case LOW: {
                return 0;
            }
        }
        return 0;
    }

    public void ensureQueue(String queueName) throws IOException {
        if (!this.declaredQueues.contains(queueName)) {
            RabbitMQWorkQueueRepository.createQueue(this.channel, queueName);
            this.declaredQueues.add(queueName);
        }
    }

    public static void createQueue(Channel channel, String queueName) throws IOException {
        HashMap<String, Integer> args = new HashMap<String, Integer>();
        args.put("x-max-priority", 3);
        channel.queueDeclare(queueName, true, false, false, args);
    }

    @Override
    public void flush() {
    }

    @Override
    public void shutdown() {
        try {
            LOGGER.debug("Closing RabbitMQ channel", new Object[0]);
            this.channel.close();
        }
        catch (Throwable e) {
            LOGGER.error("Could not close RabbitMQ channel", e);
        }
        try {
            LOGGER.debug("Closing RabbitMQ connection", new Object[0]);
            this.connection.close();
        }
        catch (Throwable e) {
            LOGGER.error("Could not close RabbitMQ connection", e);
        }
    }

    @Override
    public Map<String, Status> getQueuesStatus() {
        try {
            HashMap<String, Status> results = new HashMap<String, Status>();
            URL url = new URL(String.format("http://%s:15672/api/queues", this.rabbitMqAddresses[0].getHost()));
            URLConnection conn = url.openConnection();
            String basicAuth = Base64.encodeBase64String((byte[])"guest:guest".getBytes());
            conn.setRequestProperty("Authorization", "Basic " + basicAuth);
            try (InputStream in = conn.getInputStream();){
                JSONArray queuesJson = new JSONArray(IOUtils.toString((InputStream)in));
                for (int i = 0; i < queuesJson.length(); ++i) {
                    JSONObject queueJson = queuesJson.getJSONObject(i);
                    String name = queueJson.getString("name");
                    int messages = queueJson.getInt("messages");
                    results.put(name, new QueueStatus(messages));
                }
            }
            return results;
        }
        catch (Exception e) {
            throw new BcException("Could not connect to RabbitMQ", e);
        }
    }

    @Override
    protected void deleteQueue(String queueName) {
        try {
            this.channel.queueDelete(queueName);
        }
        catch (IOException e) {
            throw new BcException("Could not delete queue: " + queueName, e);
        }
    }

    @Override
    public WorkerSpout createWorkerSpout(String queueName) {
        return InjectHelper.inject(new RabbitMQWorkQueueSpout(queueName));
    }
}

