/*
 * Decompiled with CFR 0.152.
 */
package forklift.consumer;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import forklift.Forklift;
import forklift.classloader.RunAsClassLoader;
import forklift.connectors.ConnectorException;
import forklift.connectors.ForkliftMessage;
import forklift.consumer.ConsumerService;
import forklift.consumer.ForkliftConsumerI;
import forklift.consumer.MessageRunnable;
import forklift.consumer.ProcessStep;
import forklift.consumer.parser.KeyValueParser;
import forklift.decorators.Config;
import forklift.decorators.Headers;
import forklift.decorators.Message;
import forklift.decorators.MultiThreaded;
import forklift.decorators.On;
import forklift.decorators.OnMessage;
import forklift.decorators.OnValidate;
import forklift.decorators.Ons;
import forklift.decorators.Order;
import forklift.decorators.Producer;
import forklift.decorators.Response;
import forklift.message.Header;
import forklift.producers.ForkliftProducerI;
import forklift.properties.PropertiesManager;
import forklift.source.SourceI;
import forklift.source.SourceUtil;
import forklift.source.decorators.Queue;
import forklift.source.decorators.Topic;
import forklift.source.sources.GroupedTopicSource;
import forklift.source.sources.QueueSource;
import forklift.source.sources.RoleInputSource;
import forklift.source.sources.TopicSource;
import java.io.Closeable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
    static ObjectMapper mapper = new ObjectMapper().registerModule((Module)new JavaTimeModule()).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    private Logger log;
    private static AtomicInteger id = new AtomicInteger(1);
    private final ClassLoader classLoader;
    private final Forklift forklift;
    private final Map<Class, Map<Class<?>, List<Field>>> injectFields;
    private final Class<?> msgHandler;
    private final List<Method> onMessage;
    private final List<Method> onValidate;
    private final List<Method> onResponse;
    private final Map<String, List<MessageRunnable>> orderQueue;
    private final Map<ProcessStep, List<Method>> onProcessStep;
    private Constructor<?> constructor;
    private Annotation[][] constructorAnnotations;
    private String name;
    private SourceI source;
    private List<SourceI> roleSources = Collections.emptyList();
    private List<ConsumerService> services;
    private Method orderMethod;
    private BlockingQueue<Runnable> blockQueue;
    private ThreadPoolExecutor threadPool;
    private java.util.function.Consumer<Consumer> outOfMessages;
    private AtomicBoolean running = new AtomicBoolean(false);

    public Consumer(Class<?> msgHandler, Forklift forklift) {
        this(msgHandler, forklift, null);
    }

    public Consumer(Class<?> msgHandler, Forklift forklift, ClassLoader classLoader) {
        this(msgHandler, forklift, classLoader, false);
    }

    public Consumer(Class<?> msgHandler, Forklift forklift, ClassLoader classLoader, Queue queue) {
        this(msgHandler, forklift, classLoader, true);
        if (queue == null) {
            throw new IllegalArgumentException("Msg Handler must handle a queue.");
        }
        this.source = new QueueSource(queue);
        this.name = queue.value() + ":" + id.getAndIncrement();
        this.log = LoggerFactory.getLogger((String)this.name);
    }

    public Consumer(Class<?> msgHandler, Forklift forklift, ClassLoader classLoader, Topic topic) {
        this(msgHandler, forklift, classLoader, true);
        if (topic == null) {
            throw new IllegalArgumentException("Msg Handler must handle a topic.");
        }
        this.source = new TopicSource(topic);
        this.name = topic.value() + ":" + id.getAndIncrement();
        this.log = LoggerFactory.getLogger((String)this.name);
    }

    public Consumer(Class<?> msgHandler, Forklift forklift, ClassLoader classLoader, SourceI source, List<SourceI> roleSources) {
        this(msgHandler, forklift, classLoader, true);
        this.source = source;
        this.roleSources = roleSources;
        this.name = source.apply(QueueSource.class, queue -> queue.getName()).apply(TopicSource.class, topic -> topic.getName()).apply(GroupedTopicSource.class, topic -> topic.getName()).apply(RoleInputSource.class, roleSource -> roleSource.getRole()).get() + ":" + id.getAndIncrement();
        this.log = LoggerFactory.getLogger((String)this.name);
    }

    private Consumer(Class<?> msgHandler, Forklift forklift, ClassLoader classLoader, boolean preinit) {
        this.classLoader = classLoader;
        this.forklift = forklift;
        this.msgHandler = msgHandler;
        if (!preinit && this.source == null) {
            this.roleSources = SourceUtil.getSourcesAsList(msgHandler);
            if (this.roleSources.size() > 1) {
                throw new IllegalArgumentException("One consumer instance cannot consume more than one source");
            }
            if (this.roleSources.size() == 0) {
                throw new IllegalArgumentException("A consumer must consume at least one source");
            }
            this.source = this.roleSources.get(0);
            this.name = this.source.apply(QueueSource.class, queue -> queue.getName()).apply(TopicSource.class, topic -> topic.getName()).apply(GroupedTopicSource.class, topic -> topic.getName()).get() + ":" + id.getAndIncrement();
        }
        this.log = LoggerFactory.getLogger(Consumer.class);
        this.onMessage = new ArrayList<Method>();
        this.onValidate = new ArrayList<Method>();
        this.onResponse = new ArrayList<Method>();
        this.onProcessStep = new HashMap<ProcessStep, List<Method>>();
        Arrays.stream(ProcessStep.values()).forEach(step -> {
            List cfr_ignored_0 = this.onProcessStep.put((ProcessStep)((Object)((Object)step)), new ArrayList());
        });
        for (Method method : msgHandler.getDeclaredMethods()) {
            if (method.isAnnotationPresent(OnMessage.class)) {
                this.onMessage.add(method);
                continue;
            }
            if (method.isAnnotationPresent(OnValidate.class)) {
                this.onValidate.add(method);
                continue;
            }
            if (method.isAnnotationPresent(Response.class)) {
                if (!forklift.getConnector().supportsResponse()) {
                    throw new RuntimeException("@Response is not supported by the current connector");
                }
                this.onResponse.add(method);
                continue;
            }
            if (method.isAnnotationPresent(Order.class)) {
                if (!forklift.getConnector().supportsOrder()) {
                    throw new RuntimeException("@Order is not supported by the current connector");
                }
                this.orderMethod = method;
                continue;
            }
            if (!method.isAnnotationPresent(On.class) && !method.isAnnotationPresent(Ons.class)) continue;
            Arrays.stream(method.getAnnotationsByType(On.class)).map(on -> on.value()).distinct().forEach(x -> this.onProcessStep.get(x).add(m));
        }
        this.orderQueue = this.orderMethod != null ? new HashMap<String, List<MessageRunnable>>() : null;
        this.injectFields = new HashMap();
        this.injectFields.put(Config.class, new HashMap());
        this.injectFields.put(Inject.class, new HashMap());
        this.injectFields.put(Message.class, new HashMap());
        this.injectFields.put(Headers.class, new HashMap());
        this.injectFields.put(forklift.decorators.Properties.class, new HashMap());
        this.injectFields.put(Producer.class, new HashMap());
        for (AccessibleObject accessibleObject : msgHandler.getDeclaredFields()) {
            this.injectFields.keySet().forEach(arg_0 -> this.lambda$new$10((Field)accessibleObject, arg_0));
        }
        this.configureConstructorInjection();
    }

    public void listen() {
        try {
            ForkliftConsumerI consumer = this.forklift.getConnector().getConsumerForSource(this.source);
            if (this.msgHandler.isAnnotationPresent(MultiThreaded.class)) {
                MultiThreaded multiThreaded = this.msgHandler.getAnnotation(MultiThreaded.class);
                this.log.info("Creating thread pool of {}", (Object)multiThreaded.value());
                this.blockQueue = new ArrayBlockingQueue<Runnable>(multiThreaded.value() * 100 + 100);
                this.threadPool = new ThreadPoolExecutor(multiThreaded.value(), multiThreaded.value(), 5L, TimeUnit.MINUTES, this.blockQueue);
                this.threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            } else {
                this.blockQueue = null;
                this.threadPool = null;
            }
            this.messageLoop(consumer);
            if (consumer != null) {
                consumer.close();
            }
        }
        catch (ConnectorException e) {
            this.log.error("Error getting consumer from connector", (Throwable)e);
        }
    }

    public String getName() {
        return this.name;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void messageLoop(ForkliftConsumerI consumer) {
        try {
            this.running.set(true);
            while (this.running.get()) {
                ForkliftMessage consumerMsg;
                while ((consumerMsg = consumer.receive(2500L)) != null && this.running.get()) {
                    try {
                        ArrayList<Closeable> closeMe = new ArrayList<Closeable>();
                        Object handler = this.constructMessageHandlerInstance(consumerMsg, closeMe);
                        ForkliftMessage msg = consumerMsg;
                        RunAsClassLoader.run(this.classLoader, () -> closeMe.addAll(this.inject(msg, handler)));
                        final MessageRunnable runner = new MessageRunnable(this, msg, this.classLoader, handler, this.onMessage, this.onValidate, this.onResponse, this.onProcessStep, closeMe);
                        if (this.orderQueue != null) {
                            final String id = (String)this.orderMethod.invoke(handler, new Object[0]);
                            closeMe.add(new Closeable(){

                                /*
                                 * WARNING - Removed try catching itself - possible behaviour change.
                                 */
                                @Override
                                public void close() throws IOException {
                                    Map map = Consumer.this.orderQueue;
                                    synchronized (map) {
                                        List msgs = (List)Consumer.this.orderQueue.get(id);
                                        msgs.remove(runner);
                                        Optional optRunner = msgs.stream().findFirst();
                                        if (optRunner.isPresent()) {
                                            if (Consumer.this.threadPool != null) {
                                                Consumer.this.threadPool.execute((Runnable)optRunner.get());
                                            } else {
                                                ((MessageRunnable)optRunner.get()).run();
                                            }
                                        } else {
                                            Consumer.this.orderQueue.remove(id);
                                        }
                                    }
                                }
                            });
                            Map<String, List<MessageRunnable>> map = this.orderQueue;
                            synchronized (map) {
                                if (this.orderQueue.containsKey(id)) {
                                    this.orderQueue.get(id).add(runner);
                                    continue;
                                }
                                ArrayList<MessageRunnable> list = new ArrayList<MessageRunnable>();
                                list.add(runner);
                                this.orderQueue.put(id, list);
                            }
                        }
                        if (this.threadPool != null) {
                            this.threadPool.execute(runner);
                            continue;
                        }
                        runner.run();
                    }
                    catch (Exception e) {
                        this.log.error("Consumer couldn't be used.", (Throwable)e);
                        this.running.set(false);
                    }
                }
                if (this.outOfMessages == null) continue;
                this.outOfMessages.accept(this);
            }
            if (this.threadPool != null) {
                this.log.info("Shutting down thread pool - active {}", (Object)this.threadPool.getActiveCount());
                this.threadPool.shutdown();
                this.threadPool.awaitTermination(60L, TimeUnit.SECONDS);
                this.blockQueue.clear();
            }
        }
        catch (ConnectorException e) {
            this.running.set(false);
            this.log.error("JMS Error in message loop: ", (Throwable)e);
        }
        catch (InterruptedException e) {
        }
        finally {
            try {
                consumer.close();
            }
            catch (Exception e) {
                this.log.error("Error in message loop shutdown:", (Throwable)e);
            }
        }
    }

    public void shutdown() {
        this.log.info("Consumer shutting down");
        this.running.set(false);
    }

    public void setOutOfMessages(java.util.function.Consumer<Consumer> outOfMessages) {
        this.outOfMessages = outOfMessages;
    }

    private final void configureConstructorInjection() {
        Constructor<?>[] constructors = this.msgHandler.getDeclaredConstructors();
        List injectableConstructors = Arrays.stream(constructors).filter(constructor -> constructor.isAnnotationPresent(Inject.class)).collect(Collectors.toList());
        if (injectableConstructors.size() > 0) {
            this.constructor = (Constructor)injectableConstructors.get(0);
            this.constructorAnnotations = this.constructor.getParameterAnnotations();
            if (injectableConstructors.size() > 1) {
                this.log.error("Multiple constructors annotated with Inject.  Using first injectable constructor found");
            }
        }
    }

    public List<Closeable> inject(ForkliftMessage msg, Object instance) {
        ArrayList<Closeable> closeMe = new ArrayList<Closeable>();
        this.injectFields.keySet().stream().forEach(decorator -> {
            Map<Class<?>, List<Field>> fields = this.injectFields.get(decorator);
            fields.keySet().stream().forEach(clazz -> ((List)fields.get(clazz)).forEach(field -> {
                this.log.trace("Inject target> Field: ({})  Decorator: ({})", field, decorator);
                try {
                    Object value = this.getInjectableValue((Annotation)field.getAnnotation(decorator), field.getName(), (Class<?>)clazz, msg);
                    if (value instanceof ForkliftProducerI) {
                        closeMe.add((ForkliftProducerI)value);
                    }
                    if (value != null) {
                        field.set(instance, value);
                    }
                }
                catch (JsonParseException | JsonMappingException e) {
                    this.log.warn("Unable to parse json for injection.", e);
                }
                catch (Exception e) {
                    this.log.error("Error injecting data into Msg Handler", (Throwable)e);
                    e.printStackTrace();
                    throw new RuntimeException("Error injecting data into Msg Handler");
                }
            }));
        });
        return closeMe;
    }

    private Object constructMessageHandlerInstance(ForkliftMessage forkliftMessage, List<Closeable> closeables) throws IllegalAccessException, InvocationTargetException, InstantiationException, IOException {
        Object instance = null;
        if (this.constructor != null) {
            Object[] constructorParameters = this.buildConstructorParameters(forkliftMessage, closeables);
            instance = this.constructor.newInstance(constructorParameters);
        } else {
            instance = this.msgHandler.newInstance();
        }
        return instance;
    }

    private Object[] buildConstructorParameters(ForkliftMessage forkliftMessage, List<Closeable> closeables) throws IOException {
        Object[] parameters = new Object[this.constructorAnnotations.length];
        int index = 0;
        for (Annotation[] parameterAnnotations : this.constructorAnnotations) {
            Object value;
            Annotation injectable = null;
            for (Annotation parameterAnnotation : parameterAnnotations) {
                if (!this.injectFields.containsKey(parameterAnnotation.annotationType())) continue;
                injectable = parameterAnnotation;
                break;
            }
            Parameter p = this.constructor.getParameters()[index];
            parameters[index] = value = this.getInjectableValue(injectable, null, p.getType(), forkliftMessage);
            if (value != null && value instanceof ForkliftProducerI) {
                closeables.add((ForkliftProducerI)value);
            }
            ++index;
        }
        return parameters;
    }

    private Object getInjectableValue(Annotation decorator, String mappedName, Class<?> mappedClass, ForkliftMessage msg) throws IOException {
        Object value = null;
        if (decorator == null || decorator.annotationType() == Inject.class) {
            if (this.services != null) {
                for (ConsumerService s : this.services) {
                    try {
                        Object o = s.resolve(mappedClass, null);
                        if (o == null) continue;
                        value = o;
                        break;
                    }
                    catch (Exception e) {
                        this.log.debug("", (Throwable)e);
                    }
                }
            }
        } else if (decorator.annotationType() == Message.class && msg.getMsg() != null) {
            value = mappedClass == ForkliftMessage.class ? msg : (mappedClass == String.class ? msg.getMsg() : (mappedClass == Map.class && !msg.getMsg().startsWith("{") ? KeyValueParser.parse(msg.getMsg()) : mapper.readValue(msg.getMsg(), mappedClass)));
        } else if (decorator.annotationType() == Config.class) {
            if (mappedClass == Properties.class) {
                Properties config;
                String confName = ((Config)decorator).value();
                if (confName.equals("")) {
                    confName = mappedName;
                }
                if ((config = PropertiesManager.get(confName)) == null) {
                    this.log.warn("Attempt to inject field failed because resource file {} was not found", (Object)((Config)decorator).value());
                } else {
                    value = config;
                }
            } else {
                String key;
                Properties config = PropertiesManager.get(((Config)decorator).value());
                if (config == null) {
                    this.log.warn("Attempt to inject field failed because resource file {} was not found", (Object)((Config)decorator).value());
                }
                if ((key = ((Config)decorator).field()).equals("")) {
                    key = mappedName;
                }
                value = config.get(key);
            }
        } else if (decorator.annotationType() == Headers.class) {
            Map<Header, Object> headers = msg.getHeaders();
            if (mappedClass == Map.class) {
                value = headers;
            } else {
                Header key = ((Headers)decorator).value();
                if (headers == null) {
                    this.log.warn("Attempt to inject {} from headers, but headers are null", (Object)key);
                } else if (!key.getHeaderType().equals(mappedClass)) {
                    this.log.warn("Injecting field {} failed because it is not type {}", (Object)mappedName, (Object)key.getHeaderType());
                } else {
                    value = headers.get((Object)key);
                }
            }
        } else if (decorator.annotationType() == forklift.decorators.Properties.class) {
            Map<String, String> properties = msg.getProperties();
            if (mappedClass == Map.class) {
                value = msg.getProperties();
            } else if (properties != null) {
                String key = ((forklift.decorators.Properties)decorator).value();
                if (key.equals("")) {
                    key = mappedName;
                }
                if (properties == null) {
                    this.log.warn("Attempt to inject field {} from properties, but properties is null", (Object)key);
                } else {
                    value = properties.get(key);
                }
            }
        } else if (decorator.annotationType() == Producer.class && mappedClass == ForkliftProducerI.class) {
            Producer producer = (Producer)decorator;
            ForkliftProducerI p = producer.queue().length() > 0 ? this.forklift.getConnector().getQueueProducer(producer.queue()) : (producer.topic().length() > 0 ? this.forklift.getConnector().getTopicProducer(producer.topic()) : null);
            value = p;
        }
        return value;
    }

    public Class<?> getMsgHandler() {
        return this.msgHandler;
    }

    public Forklift getForklift() {
        return this.forklift;
    }

    public Object getMsgHandlerInstance(ForkliftMessage msg) {
        Object instance = null;
        try {
            instance = this.constructMessageHandlerInstance(msg, new ArrayList<Closeable>());
            this.inject(msg, instance);
        }
        catch (JsonParseException | JsonMappingException e) {
            this.log.warn("Unable to parse json for injection.", e);
        }
        catch (Exception e) {
            this.log.error("Error injecting data into Msg Handler", (Throwable)e);
            e.printStackTrace();
            throw new RuntimeException("Error injecting data into Msg Handler Constructor");
        }
        return instance;
    }

    public SourceI getSource() {
        return this.source;
    }

    public List<SourceI> getRoleSources() {
        return this.roleSources;
    }

    public <SOURCE extends SourceI> Stream<SOURCE> getRoleSources(Class<SOURCE> sourceType) {
        return this.roleSources.stream().filter(source -> sourceType.isInstance(source)).map(source -> {
            try {
                return (SourceI)sourceType.cast(source);
            }
            catch (ClassCastException e) {
                this.log.error("Impossible class cast exception; sound the alarms", (Throwable)e);
                return null;
            }
        });
    }

    public void addServices(ConsumerService ... services) {
        if (this.services == null) {
            this.services = new ArrayList<ConsumerService>();
        }
        for (ConsumerService s : services) {
            this.services.add(s);
        }
    }

    public void setServices(List<ConsumerService> services) {
        this.services = services;
    }

    private /* synthetic */ void lambda$new$10(Field f, Class type) {
        if (f.isAnnotationPresent(type)) {
            f.setAccessible(true);
            if (this.injectFields.get(type).get(f.getType()) == null) {
                this.injectFields.get(type).put(f.getType(), new ArrayList());
            }
            this.injectFields.get(type).get(f.getType()).add(f);
        }
    }
}

