/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.batch.TopicCache;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyByQueueContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyByTopicContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeReturnType;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.Pair;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageAccessor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.CMResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.aliyun.openservices.shade.com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumeMessageConcurrentlyService
implements ConsumeMessageService {
    private static final InternalLogger log = ClientLogger.getLog();
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final MessageListenerConcurrently messageListener;
    private final BlockingQueue<Runnable> consumeRequestQueue;
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ScheduledExecutorService cleanExpireMsgExecutors;
    private volatile boolean stopped = false;
    private Thread batchConsumeThread;
    private final Object batchConsumeConditionVariable;
    private final ConcurrentMap<String, TopicCache> cachedMessages;

    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
        this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
        if (this.defaultMQPushConsumer.getMaxBatchConsumeWaitTime() > 0L) {
            this.batchConsumeThread = new Thread(new BatchConsumeTask());
            log.info("Consume message in batch mode, maxBatchAwaitTime={}ms", (Object)this.defaultMQPushConsumer.getMaxBatchConsumeWaitTime());
        }
        this.batchConsumeConditionVariable = new Object();
        this.cachedMessages = new ConcurrentHashMap<String, TopicCache>();
    }

    @Override
    public void start() {
        if (this.defaultMQPushConsumer.getMaxBatchConsumeWaitTime() > 0L) {
            this.batchConsumeThread.start();
        }
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    log.info("Start to clean expired messages from tree-map");
                    ConsumeMessageConcurrentlyService.this.cleanExpireMsg();
                    log.info("End of expired-message cleaning");
                }
                catch (Throwable e) {
                    log.error("[BUG]Should NEVER reach here", e);
                }
            }
        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }

    @Override
    public void shutdown() {
        this.stopped = true;
        if (null != this.batchConsumeThread) {
            this.batchConsumeThread.interrupt();
        }
        this.scheduledExecutorService.shutdown();
        this.consumeExecutor.shutdown();
        this.cleanExpireMsgExecutors.shutdown();
    }

    void tryBatchConsume() {
        for (Map.Entry entry : this.cachedMessages.entrySet()) {
            int size;
            while ((size = ((TopicCache)entry.getValue()).size()) >= this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() || ((TopicCache)entry.getValue()).elapsed() >= this.defaultMQPushConsumer.getMaxBatchConsumeWaitTime()) {
                ArrayList<MessageExt> messages = new ArrayList<MessageExt>();
                int amount = Math.min(size, this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize());
                ((TopicCache)entry.getValue()).take(amount, messages);
                TopicBatchConsumeRequest consumeRequest = new TopicBatchConsumeRequest(messages);
                this.consumeExecutor.submit(consumeRequest);
            }
        }
    }

    @Override
    public void updateCorePoolSize(int corePoolSize) {
        if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
            this.consumeExecutor.setCorePoolSize(corePoolSize);
        }
    }

    @Override
    public void incCorePoolSize() {
    }

    @Override
    public void decCorePoolSize() {
    }

    @Override
    public int getCorePoolSize() {
        return this.consumeExecutor.getCorePoolSize();
    }

    @Override
    public void allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
        this.consumeExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
    }

    @Override
    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
        ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
        result.setOrder(false);
        result.setAutoCommit(true);
        ArrayList<MessageExt> msgs = new ArrayList<MessageExt>();
        msgs.add(msg);
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName(brokerName);
        mq.setTopic(msg.getTopic());
        mq.setQueueId(msg.getQueueId());
        ConsumeConcurrentlyByQueueContext context = new ConsumeConcurrentlyByQueueContext(mq);
        this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
        long beginTime = System.currentTimeMillis();
        log.info("consumeMessageDirectly receive new message: {}", (Object)msg);
        try {
            ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);
            if (status != null) {
                switch (status) {
                    case CONSUME_SUCCESS: {
                        result.setConsumeResult(CMResult.CR_SUCCESS);
                        break;
                    }
                    case RECONSUME_LATER: {
                        result.setConsumeResult(CMResult.CR_LATER);
                        break;
                    }
                }
            } else {
                result.setConsumeResult(CMResult.CR_RETURN_NULL);
            }
        }
        catch (Throwable e) {
            result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
            result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(e), this.consumerGroup, msgs, mq), e);
        }
        result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
        log.info("consumeMessageDirectly Result: {}", (Object)result);
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
        if (msgs == null || msgs.isEmpty()) {
            return;
        }
        if (this.defaultMQPushConsumer.getMaxBatchConsumeWaitTime() > 0L) {
            this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.defaultMQPushConsumer.getConsumerGroup());
            String topic = msgs.iterator().next().getTopic();
            if (!this.cachedMessages.containsKey(topic)) {
                this.cachedMessages.putIfAbsent(topic, new TopicCache(topic));
            }
            ((TopicCache)this.cachedMessages.get(topic)).put(msgs);
            Object object = this.batchConsumeConditionVariable;
            synchronized (object) {
                this.batchConsumeConditionVariable.notify();
            }
            return;
        }
        int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            }
            catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            int total = 0;
            while (total < msgs.size()) {
                ArrayList<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize && total < msgs.size(); ++i, ++total) {
                    msgThis.add(msgs.get(total));
                }
                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                }
                catch (RejectedExecutionException e) {
                    while (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                        ++total;
                    }
                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

    private void cleanExpireMsg() {
        Iterator it = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
        while (it.hasNext()) {
            try {
                Map.Entry next = it.next();
                ProcessQueue pq = (ProcessQueue)next.getValue();
                pq.cleanExpiredMsg(this.defaultMQPushConsumer);
            }
            catch (Throwable e) {
                log.warn("Unexpected exception raised when trying to clean expired message from tree-map", e);
            }
        }
    }

    public void processConsumeResult(ConsumeConcurrentlyStatus status, ConsumeConcurrentlyContext context, ConsumeRequest consumeRequest) {
        int ackIndex = context.getAckIndex();
        if (consumeRequest.getMsgs().isEmpty()) {
            return;
        }
        switch (status) {
            case CONSUME_SUCCESS: {
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(this.consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            }
            case RECONSUME_LATER: {
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size());
                break;
            }
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING: {
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); ++i) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", (Object)msg.toString());
                }
                break;
            }
            case CLUSTERING: {
                ArrayList<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); ++i) {
                    boolean result;
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    if (context.getCheckSendBackHook() != null && !context.getCheckSendBackHook().needSendBack(msg, context) || (result = this.sendMessageBack(msg, context))) continue;
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
                if (msgBackFailed.isEmpty()) break;
                consumeRequest.getMsgs().removeAll(msgBackFailed);
                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                break;
            }
        }
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0L && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
    }

    public boolean sendMessageBack(MessageExt msg, ConsumeConcurrentlyContext context) {
        int delayLevel = context.getDelayLevelWhenNextConsume();
        msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
        try {
            this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
            return true;
        }
        catch (Exception e) {
            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
            return false;
        }
    }

    private void submitConsumeRequestLater(final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue) {
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue);
            }
        }, 5000L, TimeUnit.MILLISECONDS);
    }

    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
            }
        }, 5000L, TimeUnit.MILLISECONDS);
    }

    private static String topicBrokerNameOf(MessageExt messageExt) {
        String topic = Strings.isNullOrEmpty(messageExt.getProperty("__FQN_TOPIC")) ? messageExt.getTopic() : messageExt.getProperty("__FQN_TOPIC");
        return topic + "@" + messageExt.getBrokerName();
    }

    public static String topicOf(MessageExt messageExt) {
        String fqn = messageExt.getProperty("__FQN_TOPIC");
        if (Strings.isNullOrEmpty(fqn)) {
            return messageExt.getTopic();
        }
        return fqn;
    }

    class ConsumeRequest
    implements Runnable {
        private final List<MessageExt> msgs;
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;

        public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
            this.msgs = msgs;
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        public List<MessageExt> getMsgs() {
            return this.msgs;
        }

        public ProcessQueue getProcessQueue() {
            return this.processQueue;
        }

        @Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.info("the message queue not be able to consume, because it's dropped. group={} {}", (Object)ConsumeMessageConcurrentlyService.this.consumerGroup, (Object)this.messageQueue);
                return;
            }
            MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyByQueueContext context = new ConsumeConcurrentlyByQueueContext(this.messageQueue);
            ConsumeConcurrentlyStatus status = null;
            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.resetRetryAndNamespace(this.msgs, ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
            ConsumeMessageContext consumeMessageContext = null;
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setNamespace(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getNamespace());
                consumeMessageContext.setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap<String, String>());
                consumeMessageContext.setMq(this.messageQueue);
                consumeMessageContext.setMsgList(this.msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
            }
            long beginTimestamp = System.currentTimeMillis();
            boolean hasException = false;
            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
            try {
                if (this.msgs != null && !this.msgs.isEmpty()) {
                    for (MessageExt msg : this.msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                status = listener.consumeMessage(Collections.unmodifiableList(this.msgs), context);
            }
            catch (Throwable e) {
                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue);
                hasException = true;
            }
            long consumeRT = System.currentTimeMillis() - beginTimestamp;
            if (null == status) {
                returnType = hasException ? ConsumeReturnType.EXCEPTION : ConsumeReturnType.RETURNNULL;
            } else if (consumeRT >= ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumeTimeout() * 60L * 1000L) {
                returnType = ConsumeReturnType.TIME_OUT;
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                returnType = ConsumeReturnType.FAILED;
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                returnType = ConsumeReturnType.SUCCESS;
            }
            if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.getProps().put("ConsumeContextType", returnType.name());
                consumeMessageContext.getProps().put("ConsumeExactlyOnceStatus", context.getExactlyOnceStatus().name());
                consumeMessageContext.setStatus(status.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
            }
            ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue.getTopic(), consumeRT);
            if (!this.processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", (Object)this.messageQueue, (Object)this.msgs);
            }
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }
    }

    class TopicBatchConsumeRequest
    implements Runnable {
        private final List<MessageExt> messages;
        private String topic;

        public TopicBatchConsumeRequest(List<MessageExt> messages) {
            this.messages = messages;
            if (null != this.messages && !this.messages.isEmpty()) {
                this.topic = this.messages.iterator().next().getTopic();
            }
        }

        private String msgIdsOf(Collection<MessageExt> messages) {
            StringBuilder sb = new StringBuilder();
            if (null == messages || messages.isEmpty()) {
                return sb.toString();
            }
            for (MessageExt message : messages) {
                if (0 == sb.length()) {
                    sb.append(message.getMsgId());
                    continue;
                }
                sb.append(',').append(message.getMsgId());
            }
            return sb.toString();
        }

        @Override
        public void run() {
            if (this.messages.isEmpty() || null == this.topic) {
                return;
            }
            try {
                ConsumeConcurrentlyStatus status = null;
                HashMap groupBy = new HashMap();
                for (MessageExt message : this.messages) {
                    Pair<String, Integer> key = new Pair<String, Integer>(ConsumeMessageConcurrentlyService.topicBrokerNameOf(message), message.getQueueId());
                    if (!groupBy.containsKey(key)) {
                        groupBy.put(key, new ArrayList());
                    }
                    ((List)groupBy.get(key)).add(message);
                }
                HashMap<MessageQueue, ConsumeMessageContext> consumeMessageContextMap = new HashMap<MessageQueue, ConsumeMessageContext>();
                for (Map.Entry entry : groupBy.entrySet()) {
                    MessageExt headMessage = (MessageExt)((List)entry.getValue()).iterator().next();
                    assert (null != headMessage);
                    MessageQueue messageQueue = new MessageQueue(ConsumeMessageConcurrentlyService.topicOf(headMessage), headMessage.getBrokerName(), (Integer)((Pair)entry.getKey()).getObject2());
                    if (!ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) continue;
                    ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setNamespace(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getNamespace());
                    consumeMessageContext.setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
                    consumeMessageContext.setProps(new HashMap<String, String>());
                    consumeMessageContext.setMq(messageQueue);
                    consumeMessageContext.setMsgList((List)entry.getValue());
                    consumeMessageContext.setSuccess(false);
                    consumeMessageContextMap.put(messageQueue, consumeMessageContext);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                }
                long beginTimestamp = System.currentTimeMillis();
                boolean hasException = false;
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                ConsumeConcurrentlyByTopicContext context = new ConsumeConcurrentlyByTopicContext();
                try {
                    if (!this.messages.isEmpty()) {
                        for (MessageExt msg : this.messages) {
                            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                        }
                    }
                    status = ConsumeMessageConcurrentlyService.this.messageListener.consumeMessage(Collections.unmodifiableList(this.messages), context);
                }
                catch (Throwable e) {
                    log.warn("consumeMessage exception: {} Group: {} Msgs: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, this.messages);
                    hasException = true;
                }
                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                if (null == status) {
                    returnType = hasException ? ConsumeReturnType.EXCEPTION : ConsumeReturnType.RETURNNULL;
                } else if (consumeRT >= ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumeTimeout() * 60L * 1000L) {
                    returnType = ConsumeReturnType.TIME_OUT;
                } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                    returnType = ConsumeReturnType.FAILED;
                } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                    returnType = ConsumeReturnType.SUCCESS;
                }
                if (null == status) {
                    log.warn("consumeMessage return null, Group: {} Msgs: {}", (Object)ConsumeMessageConcurrentlyService.this.consumerGroup, (Object)this.messages);
                    status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                HashMap<MessageQueue, Integer> ackMap = new HashMap<MessageQueue, Integer>();
                if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS != status) {
                    for (int i = 0; i < context.getAckIndex() && i < this.messages.size(); ++i) {
                        MessageExt message = this.messages.get(i);
                        MessageQueue messageQueue = new MessageQueue(this.topic, message.getBrokerName(), message.getQueueId());
                        if (!ackMap.containsKey(messageQueue)) {
                            ackMap.put(messageQueue, 0);
                        }
                        ackMap.put(messageQueue, (Integer)ackMap.get(messageQueue) + 1);
                    }
                }
                for (Map.Entry entry : groupBy.entrySet()) {
                    try {
                        MessageExt headMessage = (MessageExt)((List)entry.getValue()).iterator().next();
                        assert (null != headMessage);
                        MessageQueue messageQueue = new MessageQueue(ConsumeMessageConcurrentlyService.topicOf(headMessage), headMessage.getBrokerName(), (Integer)((Pair)entry.getKey()).getObject2());
                        ConsumeConcurrentlyByQueueContext queueContext = new ConsumeConcurrentlyByQueueContext(messageQueue);
                        if (ackMap.containsKey(messageQueue)) {
                            queueContext.setAckIndex((Integer)ackMap.get(messageQueue));
                        }
                        ConsumeMessageContext consumeMessageContext = (ConsumeMessageContext)consumeMessageContextMap.get(messageQueue);
                        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.getProps().put("ConsumeContextType", returnType.name());
                            consumeMessageContext.getProps().put("ConsumeExactlyOnceStatus", queueContext.getExactlyOnceStatus().name());
                            consumeMessageContext.setStatus(status.toString());
                            consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                        }
                        ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                        ProcessQueue processQueue = (ProcessQueue)ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().get(messageQueue);
                        if (null != processQueue && !processQueue.isDropped()) {
                            ConsumeRequest consumeRequest = new ConsumeRequest((List)entry.getValue(), processQueue, messageQueue);
                            ConsumeMessageConcurrentlyService.this.processConsumeResult(status, queueContext, consumeRequest);
                            continue;
                        }
                        log.info("processQueue is dropped without process consume result. messageQueue={}, msgIdList={}", (Object)messageQueue, (Object)this.msgIdsOf((Collection)entry.getValue()));
                    }
                    catch (Throwable e) {
                        log.error("[BUG]Unexpected exception raised when post-process mini-batch", e);
                    }
                }
            }
            catch (Throwable e) {
                log.error("[BUG]TopicBatchConsumeRequest raised an unexpected exception", e);
            }
        }
    }

    private class BatchConsumeTask
    implements Runnable {
        private BatchConsumeTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!ConsumeMessageConcurrentlyService.this.stopped) {
                try {
                    ConsumeMessageConcurrentlyService.this.tryBatchConsume();
                    Object object = ConsumeMessageConcurrentlyService.this.batchConsumeConditionVariable;
                    synchronized (object) {
                        ConsumeMessageConcurrentlyService.this.batchConsumeConditionVariable.wait(1000L);
                    }
                }
                catch (Throwable e) {
                    log.warn("Exception raised while schedule managing batch consuming", e);
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }
    }
}

