/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.nsq;

import com.github.brainlag.nsq.NSQConsumer;
import com.github.brainlag.nsq.NSQMessage;
import com.github.brainlag.nsq.ServerAddress;
import com.github.brainlag.nsq.callbacks.NSQMessageCallback;
import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
import com.github.brainlag.nsq.lookup.NSQLookup;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.component.nsq.NsqConfiguration;
import org.apache.camel.component.nsq.NsqEndpoint;
import org.apache.camel.component.nsq.NsqSynchronization;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NsqConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(NsqConsumer.class);
    NSQConsumer consumer;
    private final Processor processor;
    private ExecutorService executor;
    private final NsqConfiguration configuration;

    public NsqConsumer(NsqEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.processor = processor;
        this.configuration = this.getEndpoint().getConfiguration();
    }

    public NsqEndpoint getEndpoint() {
        return (NsqEndpoint)super.getEndpoint();
    }

    protected void doStart() throws Exception {
        super.doStart();
        LOG.debug("Starting NSQ Consumer");
        this.executor = this.getEndpoint().createExecutor();
        LOG.debug("Getting NSQ Connection");
        Object lookup = ObjectHelper.isEmpty((Object)this.configuration.getCustomNSQLookup()) ? new DefaultNSQLookup() : this.configuration.getCustomNSQLookup();
        for (ServerAddress server : this.configuration.getServerAddresses()) {
            lookup.addLookupAddress(server.getHost(), server.getPort() == 0 ? this.configuration.getLookupServerPort() : server.getPort());
        }
        this.consumer = new NSQConsumer((NSQLookup)lookup, this.configuration.getTopic(), this.configuration.getChannel(), (NSQMessageCallback)new CamelNsqMessageHandler(), this.getEndpoint().getNsqConfig());
        this.consumer.setLookupPeriod(this.configuration.getLookupInterval());
        this.consumer.setExecutor((Executor)this.getEndpoint().createExecutor());
        this.consumer.start();
    }

    protected void doStop() throws Exception {
        LOG.debug("Stopping NSQ Consumer");
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            } else {
                this.executor.shutdownNow();
            }
        }
        this.executor = null;
        super.doStop();
    }

    class CamelNsqMessageHandler
    implements NSQMessageCallback {
        CamelNsqMessageHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void message(NSQMessage msg) {
            LOG.debug("Received Message: {}", (Object)msg);
            Exchange exchange = NsqConsumer.this.createExchange(false);
            try {
                exchange.setPattern(ExchangePattern.InOnly);
                exchange.getIn().setBody((Object)msg.getMessage());
                exchange.getIn().setHeader("CamelNsqMessageId", (Object)msg.getId());
                exchange.getIn().setHeader("CamelNsqMessageAttempts", (Object)msg.getAttempts());
                if (msg.getTimestamp() != null) {
                    exchange.getIn().setHeader("CamelNsqMessageTimestamp", (Object)msg.getTimestamp());
                    exchange.getIn().setHeader("CamelMessageTimestamp", (Object)msg.getTimestamp().getTime());
                }
                if (NsqConsumer.this.configuration.getAutoFinish().booleanValue()) {
                    msg.finished();
                } else {
                    ((ExtendedExchange)exchange.adapt(ExtendedExchange.class)).addOnCompletion((Synchronization)new NsqSynchronization(msg, (int)NsqConsumer.this.configuration.getRequeueInterval()));
                }
                NsqConsumer.this.processor.process(exchange);
            }
            catch (Exception e) {
                if (!NsqConsumer.this.configuration.getAutoFinish().booleanValue()) {
                    msg.requeue((int)NsqConsumer.this.configuration.getRequeueInterval());
                }
                NsqConsumer.this.getExceptionHandler().handleException("Error during processing", exchange, (Throwable)e);
            }
            finally {
                NsqConsumer.this.releaseExchange(exchange, false);
            }
        }
    }
}

