/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer;

import com.aliyun.openservices.ons.shaded.com.google.common.base.Optional;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.MoreExecutors;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ConsumeStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.listener.MessageListener;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ConsumeService;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageExt;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageQueue;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class ConsumeOrderlyService
extends ConsumeService {
    private static final Logger log = LoggerFactory.getLogger(ConsumeOrderlyService.class);

    public ConsumeOrderlyService(MessageListener messageListener, MessageInterceptor interceptor, ThreadPoolExecutor consumptionExecutor, ScheduledExecutorService scheduler, ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable) {
        super(messageListener, interceptor, consumptionExecutor, scheduler, processQueueTable);
    }

    @Override
    public boolean dispatch0() {
        ArrayList processQueues = new ArrayList(this.processQueueTable.values());
        Collections.shuffle(processQueues);
        boolean dispatched = false;
        for (final ProcessQueue pq : processQueues) {
            final Optional<MessageExt> messageExt = pq.tryTakeFifoMessage();
            if (!messageExt.isPresent()) continue;
            dispatched = true;
            log.debug("Take fifo message already, messageId={}", (Object)messageExt.get().getMsgId());
            ListenableFuture<ConsumeStatus> future = this.consume(messageExt.get());
            Futures.addCallback(future, new FutureCallback<ConsumeStatus>(){

                @Override
                public void onSuccess(ConsumeStatus status) {
                    pq.eraseFifoMessage((MessageExt)messageExt.get(), status);
                }

                @Override
                public void onFailure(Throwable t2) {
                    log.error("[Bug] Exception raised in consumption callback.", t2);
                }
            }, MoreExecutors.directExecutor());
        }
        return dispatched;
    }
}

