/*
 * Decompiled with CFR 0.152.
 */
package com.itfsw.redis.mq.support.consumer;

import com.itfsw.redis.mq.MessageConsumer;
import com.itfsw.redis.mq.MessageListener;
import com.itfsw.redis.mq.MessageQueue;
import com.itfsw.redis.mq.exception.MessageHandlerException;
import com.itfsw.redis.mq.exception.NotFoundListenerException;
import com.itfsw.redis.mq.model.MessageWrapper;
import com.itfsw.redis.mq.support.consumer.handler.QueueMessageExpiredHandler;
import com.itfsw.redis.mq.support.consumer.handler.QueueMessageFailureHandler;
import com.itfsw.redis.mq.support.consumer.handler.QueueMessageSuccessHandler;
import com.itfsw.redis.mq.support.consumer.strategy.DefaultQueueMessageExpiredHandler;
import com.itfsw.redis.mq.support.consumer.strategy.DefaultQueueMessageFailureHandler;
import com.itfsw.redis.mq.support.consumer.strategy.DefaultQueueMessageSuccessHandler;
import com.itfsw.redis.mq.support.consumer.strategy.MultiThreadingStrategy;

public class DefaultMessageConsumer<T>
implements MessageConsumer<T> {
    private MessageQueue<T> queue;
    private MessageListener<T> messageListener;
    private QueueMessageSuccessHandler successHandler;
    private QueueMessageFailureHandler failureHandler;
    private QueueMessageExpiredHandler expiredHandler;
    private int threadsNum = 1;
    private MultiThreadingStrategy messageHandlerThread;

    public DefaultMessageConsumer(MessageQueue<T> queue) {
        this.queue = queue;
    }

    public DefaultMessageConsumer(MessageQueue<T> queue, int threadsNum) {
        this.queue = queue;
        this.threadsNum = threadsNum;
    }

    @Override
    public MessageQueue<T> getQueue() {
        return this.queue;
    }

    @Override
    public void startConsumer() {
        this.messageHandlerThread = new MultiThreadingStrategy(this.threadsNum);
        this.messageHandlerThread.start(this.queue.getQueueName(), new Runnable(){
            private static final short MAX_MILLIS_FOR_EMPTY_WHILE = 350;
            private static final short MIN_MILLIS_FOR_EMPTY_WHILE = 50;
            private static final short INCREMENTS_MILLIS_FOR_EMPTY_WHILE = 2;
            private short waitTime = (short)50;

            @Override
            public void run() {
                MessageWrapper wrapper = null;
                try {
                    wrapper = DefaultMessageConsumer.this.queue.poll();
                    if (wrapper != null) {
                        this.waitTime = (short)50;
                        long time = DefaultMessageConsumer.this.queue.redisOps().time();
                        if (wrapper.getExpires() > -1L && wrapper.getCreateTime() + wrapper.getExpires() > time) {
                            DefaultMessageConsumer.this.expiredHandler.onMessage(DefaultMessageConsumer.this.queue, wrapper);
                        } else {
                            DefaultMessageConsumer.this.messageListener.onMessage(wrapper);
                            DefaultMessageConsumer.this.successHandler.onMessage(DefaultMessageConsumer.this.queue, wrapper);
                        }
                    } else {
                        if (this.waitTime < 350) {
                            this.waitTime = (short)(this.waitTime + 2);
                        }
                        Thread.sleep(this.waitTime);
                    }
                }
                catch (MessageHandlerException e) {
                    DefaultMessageConsumer.this.failureHandler.onMessageHandlerException(DefaultMessageConsumer.this.queue, wrapper, e);
                }
                catch (Throwable e) {
                    DefaultMessageConsumer.this.failureHandler.onMessageProgressException(DefaultMessageConsumer.this.queue, wrapper, e);
                }
            }
        });
    }

    @Override
    public void stopConsumer() {
        this.messageHandlerThread.stop();
    }

    @Override
    public void setMessageListener(MessageListener<T> messageListener) {
        this.messageListener = messageListener;
    }

    @Override
    public void setSuccessHandler(QueueMessageSuccessHandler successHandler) {
        this.successHandler = successHandler;
    }

    @Override
    public void setFailureHandler(QueueMessageFailureHandler failureHandler) {
        this.failureHandler = failureHandler;
    }

    @Override
    public void setExpiredHandler(QueueMessageExpiredHandler expiredHandler) {
        this.expiredHandler = expiredHandler;
    }

    public void destroy() throws Exception {
        this.stopConsumer();
    }

    public void afterPropertiesSet() throws Exception {
        if (this.successHandler == null) {
            this.successHandler = new DefaultQueueMessageSuccessHandler();
        }
        if (this.failureHandler == null) {
            this.failureHandler = new DefaultQueueMessageFailureHandler();
        }
        if (this.expiredHandler == null) {
            this.expiredHandler = new DefaultQueueMessageExpiredHandler();
        }
        if (this.messageListener == null) {
            throw new NotFoundListenerException("can not found a message listener");
        }
        if (this.messageHandlerThread == null) {
            this.startConsumer();
        }
    }
}

