/*
 * Decompiled with CFR 0.152.
 */
package com.mware.core.model.workQueue;

import com.google.inject.Inject;
import com.mware.core.config.Configuration;
import com.mware.core.exception.BcException;
import com.mware.core.ingest.WorkerSpout;
import com.mware.core.ingest.WorkerTuple;
import com.mware.core.model.workQueue.RabbitMQUtils;
import com.mware.core.model.workQueue.RabbitMQWorkQueueRepository;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQWorkQueueSpout
extends WorkerSpout {
    private static final BcLogger LOGGER = BcLoggerFactory.getLogger(RabbitMQWorkQueueSpout.class);
    public static final int DEFAULT_RABBITMQ_PREFETCH_COUNT = 10;
    private final String queueName;
    private Channel channel;
    private QueueingConsumer consumer;
    private Connection connection;
    private Configuration configuration;

    public RabbitMQWorkQueueSpout(String queueName) {
        this.queueName = queueName;
    }

    @Override
    public void open() {
        try {
            this.connection = RabbitMQUtils.openConnection(this.configuration);
            this.channel = RabbitMQUtils.openChannel(this.connection);
            RabbitMQWorkQueueRepository.createQueue(this.channel, this.queueName);
            this.consumer = new QueueingConsumer(this.channel);
            Integer prefetchCount = this.configuration.getInt("rabbitmq.prefetch.count", 10);
            this.channel.basicQos(prefetchCount.intValue(), false);
            this.channel.basicConsume(this.queueName, false, (Consumer)this.consumer);
        }
        catch (IOException ex) {
            throw new BcException("Could not startup RabbitMQ", ex);
        }
    }

    @Override
    public void close() {
        super.close();
        try {
            LOGGER.debug("Closing RabbitMQ channel", new Object[0]);
            this.channel.close();
            LOGGER.debug("Closing RabbitMQ connection", new Object[0]);
            this.connection.close();
        }
        catch (IOException | TimeoutException ex) {
            LOGGER.error("Could not close RabbitMQ connection and channel", ex);
        }
    }

    @Override
    public WorkerTuple nextTuple() throws InterruptedException {
        QueueingConsumer.Delivery delivery = this.consumer.nextDelivery(100L);
        if (delivery == null) {
            return null;
        }
        return new WorkerTuple(delivery.getEnvelope().getDeliveryTag(), delivery.getBody());
    }

    @Override
    public void ack(WorkerTuple workerTuple) {
        long deliveryTag = (Long)workerTuple.getMessageId();
        try {
            this.channel.basicAck(deliveryTag, false);
        }
        catch (IOException ex) {
            LOGGER.error("Could not ack: %d", deliveryTag, ex);
        }
    }

    @Override
    public void fail(WorkerTuple workerTuple) {
        long deliveryTag = (Long)workerTuple.getMessageId();
        try {
            this.channel.basicNack(deliveryTag, false, false);
        }
        catch (IOException ex) {
            LOGGER.error("Could not ack: %d", deliveryTag, ex);
        }
    }

    protected QueueingConsumer getConsumer() {
        return this.consumer;
    }

    @Inject
    public void setConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }
}

