/*
 * Decompiled with CFR 0.152.
 */
package forklift.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import forklift.classloader.RunAsClassLoader;
import forklift.concurrent.Callback;
import forklift.connectors.ConnectorException;
import forklift.connectors.ForkliftConnectorI;
import forklift.connectors.ForkliftMessage;
import forklift.consumer.MessageRunnable;
import forklift.consumer.parser.KeyValueParser;
import forklift.decorators.Config;
import forklift.decorators.Headers;
import forklift.decorators.MultiThreaded;
import forklift.decorators.OnMessage;
import forklift.decorators.OnValidate;
import forklift.decorators.Producer;
import forklift.decorators.Queue;
import forklift.decorators.Topic;
import forklift.producers.ForkliftProducerI;
import forklift.properties.PropertiesManager;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;

public class Consumer {
    private Logger log;
    private static AtomicInteger id = new AtomicInteger(1);
    private static ObjectMapper mapper = new ObjectMapper();
    private final ClassLoader classLoader;
    private final ForkliftConnectorI connector;
    private final Map<Class, Map<Class<?>, List<Field>>> injectFields;
    private final Class<?> msgHandler;
    private final String name;
    private final List<Method> onMessage;
    private final List<Method> onValidate;
    private final Queue queue;
    private final Topic topic;
    private final ApplicationContext context;
    private final BlockingQueue<Runnable> blockQueue;
    private final ThreadPoolExecutor threadPool;
    private Callback<Consumer> outOfMessages;
    private AtomicBoolean running = new AtomicBoolean(false);

    public Consumer(Class<?> msgHandler, ForkliftConnectorI connector) {
        this(msgHandler, connector, null);
    }

    public Consumer(Class<?> msgHandler, ForkliftConnectorI connector, ClassLoader classLoader) {
        this(msgHandler, connector, classLoader, null);
    }

    public Consumer(Class<?> msgHandler, ForkliftConnectorI connector, ClassLoader classLoader, ApplicationContext context) {
        this.classLoader = classLoader;
        this.connector = connector;
        this.msgHandler = msgHandler;
        this.context = context;
        this.topic = msgHandler.getAnnotation(Topic.class);
        this.queue = msgHandler.getAnnotation(Queue.class);
        if (this.queue != null && this.topic != null) {
            throw new IllegalArgumentException("Msg Handler cannot consume a queue and topic");
        }
        if (this.queue != null) {
            this.name = this.queue.value() + ":" + id.getAndIncrement();
        } else if (this.topic != null) {
            this.name = this.topic.value() + ":" + id.getAndIncrement();
        } else {
            throw new IllegalArgumentException("Msg Handler must handle a queue or topic.");
        }
        this.log = LoggerFactory.getLogger((String)this.name);
        if (msgHandler.isAnnotationPresent(MultiThreaded.class)) {
            MultiThreaded multiThreaded = msgHandler.getAnnotation(MultiThreaded.class);
            this.log.info("Creating thread pool of {}", (Object)multiThreaded.value());
            this.blockQueue = new ArrayBlockingQueue<Runnable>(multiThreaded.value() * 100 + 100);
            this.threadPool = new ThreadPoolExecutor(Math.min(2, multiThreaded.value()), multiThreaded.value(), 5L, TimeUnit.MINUTES, this.blockQueue);
        } else {
            this.blockQueue = null;
            this.threadPool = null;
        }
        this.onMessage = new ArrayList<Method>();
        this.onValidate = new ArrayList<Method>();
        for (Method method : msgHandler.getDeclaredMethods()) {
            if (method.isAnnotationPresent(OnMessage.class)) {
                this.onMessage.add(method);
                continue;
            }
            if (!method.isAnnotationPresent(OnValidate.class)) continue;
            this.onValidate.add(method);
        }
        this.injectFields = new HashMap();
        this.injectFields.put(Config.class, new HashMap());
        this.injectFields.put(Inject.class, new HashMap());
        this.injectFields.put(forklift.decorators.Message.class, new HashMap());
        this.injectFields.put(Headers.class, new HashMap());
        this.injectFields.put(forklift.decorators.Properties.class, new HashMap());
        this.injectFields.put(Producer.class, new HashMap());
        for (AccessibleObject accessibleObject : msgHandler.getDeclaredFields()) {
            this.injectFields.keySet().forEach(arg_0 -> this.lambda$new$0((Field)accessibleObject, arg_0));
        }
    }

    public void listen() {
        try {
            MessageConsumer consumer;
            if (this.topic != null) {
                consumer = this.connector.getTopic(this.topic.value());
            } else if (this.queue != null) {
                consumer = this.connector.getQueue(this.queue.value());
            } else {
                throw new RuntimeException("No queue/topic specified");
            }
            this.messageLoop(consumer);
        }
        catch (ConnectorException e) {
            this.log.debug("", (Throwable)e);
        }
    }

    public String getName() {
        return this.name;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void messageLoop(MessageConsumer consumer) {
        try {
            this.running.set(true);
            while (this.running.get()) {
                Message jmsMsg;
                while ((jmsMsg = consumer.receive(2500L)) != null) {
                    ForkliftMessage msg = this.connector.jmsToForklift(jmsMsg);
                    try {
                        Object handler = this.msgHandler.newInstance();
                        RunAsClassLoader.run(this.classLoader, () -> this.inject(msg, handler));
                        MessageRunnable runner = new MessageRunnable(this, msg, this.classLoader, handler, this.onMessage, this.onValidate);
                        if (this.threadPool != null) {
                            this.threadPool.execute(runner);
                            continue;
                        }
                        runner.run();
                    }
                    catch (Exception e) {
                        this.log.error("Consumer couldn't be used.", (Throwable)e);
                        this.running.set(false);
                    }
                }
                if (this.outOfMessages == null) continue;
                this.outOfMessages.handle(this);
            }
            if (this.threadPool != null) {
                this.log.info("Shutting down thread pool - active {}", (Object)this.threadPool.getActiveCount());
                this.threadPool.shutdown();
            }
        }
        catch (JMSException e) {
            this.running.set(false);
            this.log.error("JMS Error in message loop: ", (Throwable)e);
        }
        finally {
            try {
                consumer.close();
            }
            catch (Exception e) {
                this.log.error("Error in message loop shutdown:", (Throwable)e);
            }
        }
    }

    public void shutdown() {
        this.running.set(false);
    }

    public void setOutOfMessages(Callback<Consumer> outOfMessages) {
        this.outOfMessages = outOfMessages;
    }

    public void inject(ForkliftMessage msg, Object instance) {
        this.injectFields.keySet().stream().forEach(decorator -> {
            Map<Class<?>, List<Field>> fields = this.injectFields.get(decorator);
            fields.keySet().stream().forEach(clazz2 -> ((List)fields.get(clazz2)).forEach(f -> {
                try {
                    if (decorator == forklift.decorators.Message.class) {
                        if (clazz2 == ForkliftMessage.class) {
                            f.set(instance, msg);
                        } else if (clazz2 == String.class) {
                            f.set(instance, msg.getMsg());
                        } else if (clazz2 == Map.class) {
                            f.set(instance, KeyValueParser.parse(msg.getMsg()));
                        } else {
                            f.set(instance, mapper.readValue(msg.getMsg(), clazz2));
                        }
                    } else if (decorator == Inject.class) {
                        if (clazz2 == ApplicationContext.class) {
                            f.set(instance, this.context);
                        } else {
                            f.set(instance, this.context.getBean(clazz2));
                        }
                    } else if (decorator == Config.class) {
                        if (clazz2 == Properties.class) {
                            Config config = f.getAnnotation(Config.class);
                            f.set(instance, PropertiesManager.get(config.value()));
                        }
                    } else if (decorator == Headers.class) {
                        if (clazz2 == Map.class) {
                            f.set(instance, msg.getHeaders());
                        }
                    } else if (decorator == forklift.decorators.Properties.class) {
                        if (clazz2 == Map.class) {
                            f.set(instance, msg.getProperties());
                        }
                    } else if (decorator == Producer.class && clazz2 == ForkliftProducerI.class) {
                        Producer producer = f.getAnnotation(Producer.class);
                        if (producer.queue().length() > 0) {
                            f.set(instance, this.connector.getQueueProducer(producer.queue()));
                        } else if (producer.topic().length() > 0) {
                            f.set(instance, this.connector.getTopicProducer(producer.topic()));
                        }
                    }
                }
                catch (Exception e) {
                    this.log.error("Error injecting data into Msg Handler", (Throwable)e);
                    throw new RuntimeException("Error injecting data into Msg Handler");
                }
            }));
        });
    }

    public Class<?> getMsgHandler() {
        return this.msgHandler;
    }

    public Queue getQueue() {
        return this.queue;
    }

    public Topic getTopic() {
        return this.topic;
    }

    public ForkliftConnectorI getConnector() {
        return this.connector;
    }

    private /* synthetic */ void lambda$new$0(Field field, Class type) {
        if (field.isAnnotationPresent(type)) {
            field.setAccessible(true);
            if (this.injectFields.get(type).get(field.getType()) == null) {
                this.injectFields.get(type).put(field.getType(), new ArrayList());
            }
            this.injectFields.get(type).get(field.getType()).add(field);
        }
    }
}

