/*
 * Decompiled with CFR 0.152.
 */
package com.kjetland.dropwizard.activemq;

import com.codahale.metrics.health.HealthCheck;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kjetland.dropwizard.activemq.ActiveMQBaseExceptionHandler;
import com.kjetland.dropwizard.activemq.ActiveMQBundle;
import com.kjetland.dropwizard.activemq.ActiveMQExceptionHandler;
import com.kjetland.dropwizard.activemq.ActiveMQReceiver;
import com.kjetland.dropwizard.activemq.ActiveMQUtils;
import com.kjetland.dropwizard.activemq.DestinationCreator;
import com.kjetland.dropwizard.activemq.DestinationCreatorImpl;
import com.kjetland.dropwizard.activemq.errors.JsonError;
import io.dropwizard.lifecycle.Managed;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.jms.pool.PooledMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActiveMQReceiverHandler<T>
implements Managed,
Runnable {
    static final Field pooledMessageConsumerDelegateField;
    private static final long SLEEP_TIME_MILLS = 10000L;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final String destination;
    private final ConnectionFactory connectionFactory;
    private final Class<? extends T> receiverType;
    private final ActiveMQReceiver<T> receiver;
    private final ObjectMapper objectMapper;
    private final Thread thread;
    private AtomicBoolean shouldStop = new AtomicBoolean(false);
    private AtomicBoolean isReceiving = new AtomicBoolean(false);
    private final ActiveMQBaseExceptionHandler exceptionHandler;
    protected final DestinationCreator destinationCreator = new DestinationCreatorImpl();
    protected final long shutdownWaitInSeconds;
    protected int errorsInARowCount = 0;

    public ActiveMQReceiverHandler(String destination, ConnectionFactory connectionFactory, ActiveMQReceiver<T> receiver, Class<? extends T> receiverType, ObjectMapper objectMapper, ActiveMQBaseExceptionHandler exceptionHandler, long shutdownWaitInSeconds) {
        this.destination = destination;
        this.connectionFactory = connectionFactory;
        this.receiver = receiver;
        this.receiverType = receiverType;
        this.objectMapper = objectMapper;
        this.exceptionHandler = exceptionHandler;
        this.shutdownWaitInSeconds = shutdownWaitInSeconds;
        this.thread = new Thread((Runnable)this, "Receiver " + destination);
    }

    public ActiveMQReceiverHandler(String destination, ConnectionFactory connectionFactory, ActiveMQReceiver<T> receiver, Class<? extends T> receiverType, ObjectMapper objectMapper, ActiveMQExceptionHandler exceptionHandler, long shutdownWaitInSeconds) {
        this(destination, connectionFactory, receiver, receiverType, objectMapper, (ActiveMQBaseExceptionHandler)exceptionHandler, shutdownWaitInSeconds);
    }

    public void start() throws Exception {
        this.log.info("Starting receiver for " + this.destination);
        this.thread.start();
    }

    public void stop() throws Exception {
        this.log.info("Stopping receiver for " + this.destination + " (Going to wait for max " + this.shutdownWaitInSeconds + " seconds)");
        if (this.thread.isAlive()) {
            this.shouldStop.set(true);
            long start = System.currentTimeMillis();
            while (this.thread.isAlive()) {
                if ((System.currentTimeMillis() - start) / 1000L >= this.shutdownWaitInSeconds) {
                    this.log.warn("Giving up waiting for receiver-thread shutdown");
                    break;
                }
                this.log.debug("ReceiverThread is still alive..");
                Thread.sleep(200L);
            }
        }
        this.log.info("Stopped receiver for " + this.destination);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void processMessage(ActiveMQMessageConsumer messageConsumer, Message message) {
        String json = null;
        try {
            ActiveMQBundle.correlationID.set(message.getJMSCorrelationID());
            if (message instanceof TextMessage) {
                json = ((TextMessage)message).getText();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received " + json);
                }
                if (this.receiverType.equals(String.class)) {
                    this.receiver.receive(json);
                } else {
                    T object = this.fromJson(json);
                    this.receiver.receive(object);
                }
            } else if (message instanceof ActiveMQMapMessage) {
                ActiveMQMapMessage m = (ActiveMQMapMessage)message;
                if (!this.receiverType.equals(Map.class)) throw new Exception("We received a ActiveMQMapMessage-message, so you have to use receiverType = java.util.Map to receive it");
                this.receiver.receive(m.getContentMap());
            } else {
                if (!(message instanceof ActiveMQObjectMessage)) throw new Exception("Do not know how to handle messages of type " + message.getClass());
                ActiveMQObjectMessage m = (ActiveMQObjectMessage)message;
                if (!this.receiverType.isAssignableFrom(m.getObject().getClass())) throw new java.lang.IllegalStateException("Incompatible reciever types. " + this.receiverType + " must be assignable from " + m.getObject().getClass());
                this.receiver.receive(m.getObject());
            }
            message.acknowledge();
            return;
        }
        catch (Exception e) {
            if (this.exceptionHandler.onException(message, json, e)) {
                try {
                    message.acknowledge();
                    return;
                }
                catch (JMSException x) {
                    throw new RuntimeException(x);
                }
            }
            try {
                messageConsumer.rollback();
                return;
            }
            catch (JMSException e1) {
                throw new RuntimeException("Error rollbacking failed message", e1);
            }
        }
        finally {
            ActiveMQBundle.correlationID.remove();
        }
    }

    private T fromJson(String json) {
        try {
            return (T)this.objectMapper.readValue(json, this.receiverType);
        }
        catch (IOException e) {
            throw new JsonError(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.errorsInARowCount = 0;
        boolean verboseInitLogging = true;
        while (!this.shouldStop.get()) {
            try {
                if (verboseInitLogging) {
                    this.log.info("Setting up receiver for " + this.destination);
                } else {
                    this.log.debug("Setting up receiver for " + this.destination);
                }
                Connection connection = this.connectionFactory.createConnection();
                try {
                    connection.start();
                    Session session = connection.createSession(false, 2);
                    try {
                        Destination d = this.destinationCreator.create(session, this.destination);
                        MessageConsumer rawMessageConsumer = session.createConsumer(d);
                        ActiveMQMessageConsumer messageConsumer = this.convertToActiveMQMessageConsumer(rawMessageConsumer);
                        try {
                            if (verboseInitLogging) {
                                this.log.info("Started listening for messages on " + this.destination);
                            } else {
                                this.log.debug("Started listening for messages on " + this.destination);
                            }
                            this.isReceiving.set(true);
                            this.runReceiveLoop(messageConsumer);
                        }
                        finally {
                            this.isReceiving.set(false);
                            ActiveMQUtils.silent(() -> messageConsumer.close());
                        }
                    }
                    finally {
                        ActiveMQUtils.silent(() -> session.close());
                    }
                }
                finally {
                    ActiveMQUtils.silent(() -> connection.close());
                }
            }
            catch (Throwable e) {
                ++this.errorsInARowCount;
                boolean continuingErrorSituation = this.errorsInARowCount > 1;
                verboseInitLogging = true;
                if (e instanceof IllegalStateException && e.getMessage().equals("The Consumer is closed") && !continuingErrorSituation) {
                    this.log.debug("Consumer is closed - will try to recover", e);
                    verboseInitLogging = false;
                } else {
                    this.log.error("Uncaught exception - will try to recover", e);
                }
                if (!continuingErrorSituation) continue;
                this.log.warn("Numbers of errors in a row {} - Going to sleep {} mills before retrying", (Object)this.errorsInARowCount, (Object)10000L);
                ActiveMQUtils.silent(() -> Thread.sleep(10000L));
            }
        }
        this.log.debug("Message-checker-thread stopped");
    }

    private ActiveMQMessageConsumer convertToActiveMQMessageConsumer(MessageConsumer rawMessageConsumer) {
        if (rawMessageConsumer instanceof ActiveMQMessageConsumer) {
            return (ActiveMQMessageConsumer)rawMessageConsumer;
        }
        if (rawMessageConsumer instanceof PooledMessageConsumer) {
            try {
                return (ActiveMQMessageConsumer)pooledMessageConsumerDelegateField.get(rawMessageConsumer);
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException("Error extracting ActiveMQMessageConsumer from " + rawMessageConsumer.getClass(), e);
            }
        }
        throw new RuntimeException("Unable to convert messageConsumer '" + rawMessageConsumer.getClass() + "' to ActiveMQMessageConsumer");
    }

    private void runReceiveLoop(ActiveMQMessageConsumer messageConsumer) throws JMSException {
        while (!this.shouldStop.get()) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Checking for new message");
            }
            Message message = messageConsumer.receive(400L);
            this.errorsInARowCount = 0;
            if (message == null) continue;
            this.processMessage(messageConsumer, message);
        }
    }

    public HealthCheck getHealthCheck() {
        return new HealthCheck(){

            protected HealthCheck.Result check() throws Exception {
                if (ActiveMQReceiverHandler.this.isReceiving.get()) {
                    return HealthCheck.Result.healthy((String)("Is receiving from " + ActiveMQReceiverHandler.this.destination));
                }
                return HealthCheck.Result.unhealthy((String)("Is NOT receiving from " + ActiveMQReceiverHandler.this.destination));
            }
        };
    }

    static {
        try {
            pooledMessageConsumerDelegateField = PooledMessageConsumer.class.getDeclaredField("delegate");
            pooledMessageConsumerDelegateField.setAccessible(true);
        }
        catch (NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }
}

