/*
 * Decompiled with CFR 0.152.
 */
package com.qcloud.cmq.client.consumer;

import com.qcloud.cmq.client.client.ThreadGroupFactory;
import com.qcloud.cmq.client.common.LogHelper;
import com.qcloud.cmq.client.common.RemoteHelper;
import com.qcloud.cmq.client.common.RequestIdHelper;
import com.qcloud.cmq.client.consumer.BatchDeleteCallback;
import com.qcloud.cmq.client.consumer.BatchDeleteResult;
import com.qcloud.cmq.client.consumer.BatchReceiveCallback;
import com.qcloud.cmq.client.consumer.BatchReceiveResult;
import com.qcloud.cmq.client.consumer.ConsumerImpl;
import com.qcloud.cmq.client.consumer.Message;
import com.qcloud.cmq.client.consumer.MessageListener;
import com.qcloud.cmq.client.exception.MQClientException;
import com.qcloud.cmq.client.netty.CommunicationMode;
import com.qcloud.cmq.client.netty.RemoteException;
import com.qcloud.cmq.client.protocol.Cmq;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;

public class SubscribeService {
    private static final Logger logger = LogHelper.getLog();
    private final String queue;
    private final MessageListener listener;
    private final Cmq.CMQProto.Builder pullRequestBuilder;
    private final ConsumerImpl consumer;
    private final BlockingQueue<Runnable> consumeRequestQueue;
    private final ThreadPoolExecutor consumeExecutor;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "PullMessageScheduledThread");
        }
    });
    private AtomicInteger flightPullRequest = new AtomicInteger();

    SubscribeService(String queue, MessageListener listener, Cmq.CMQProto.Builder builder, ConsumerImpl consumer) {
        this.queue = queue;
        this.listener = listener;
        this.pullRequestBuilder = builder;
        this.consumer = consumer;
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
        this.consumeExecutor = new ThreadPoolExecutor(1, 1, 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadGroupFactory("ConsumeMessageThread_"));
    }

    private void startScheduleTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                logger.debug("schedule flightPullRequest:{}, size:{}, active: {}", new Object[]{SubscribeService.this.flightPullRequest.get(), SubscribeService.this.consumeRequestQueue.size(), SubscribeService.this.consumeExecutor.getActiveCount()});
                if (SubscribeService.this.flightPullRequest.get() < 16 && SubscribeService.this.consumeRequestQueue.size() < 16) {
                    SubscribeService.this.flightPullRequest.incrementAndGet();
                    SubscribeService.this.pullImmediately();
                }
            }
        }, 0L, 1000L, TimeUnit.MILLISECONDS);
    }

    private void submitPullRequest() {
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                logger.debug("submit flightPullRequest:{}, size:{}", (Object)SubscribeService.this.flightPullRequest.get(), (Object)SubscribeService.this.consumeRequestQueue.size());
                if (SubscribeService.this.flightPullRequest.get() < 16 && SubscribeService.this.consumeRequestQueue.size() < 16) {
                    SubscribeService.this.flightPullRequest.incrementAndGet();
                    SubscribeService.this.pullImmediately();
                }
            }
        }, 0L, TimeUnit.MILLISECONDS);
    }

    private void pullImmediately() {
        Cmq.CMQProto request = this.pullRequestBuilder.setSeqno(RequestIdHelper.getNextSeqNo()).build();
        int timeoutMS = this.consumer.getConsumer().getRequestTimeoutMS() + this.consumer.getConsumer().getPollingWaitSeconds() * 1000;
        try {
            List<String> accessList = this.consumer.getQueueRoute(this.queue);
            this.consumer.getMQClientInstance().getCMQClient().batchReceiveMessage(accessList, request, timeoutMS, CommunicationMode.ASYNC, new BatchReceiveCallback(){

                @Override
                public void onSuccess(BatchReceiveResult receiveResult) {
                    SubscribeService.this.flightPullRequest.decrementAndGet();
                    if (receiveResult.getReturnCode() == 0) {
                        SubscribeService.this.consumeExecutor.submit(new ConsumeRequest(receiveResult.getMessageList()));
                    } else if (receiveResult.getReturnCode() != 10200) {
                        logger.info("pull message error:" + receiveResult.getErrorMessage() + ", errorCode:" + receiveResult.getReturnCode());
                    }
                }

                @Override
                public void onException(Throwable e) {
                    SubscribeService.this.flightPullRequest.decrementAndGet();
                    logger.info("pull message error :" + e.getMessage());
                }
            });
        }
        catch (RemoteException e) {
            logger.error("pull message error", (Throwable)e);
            this.consumer.setNeedUpdateRoute();
        }
        catch (InterruptedException e) {
            logger.error("pull message error", (Throwable)e);
            this.consumer.setNeedUpdateRoute();
        }
        catch (MQClientException e) {
            logger.error("pull message error", (Throwable)e);
            this.consumer.setNeedUpdateRoute();
        }
    }

    public void start() {
        this.startScheduleTask();
    }

    public void shutdown() {
        this.scheduledExecutorService.shutdown();
        this.consumeExecutor.shutdown();
        logger.info("shutdown pull service for queue {} success.", (Object)this.queue);
    }

    class ConsumeRequest
    implements Runnable {
        private final List<Message> msgList;

        ConsumeRequest(List<Message> msgList) {
            this.msgList = msgList;
        }

        @Override
        public void run() {
            try {
                List<Long> ackMsg = SubscribeService.this.listener.consumeMessage(SubscribeService.this.queue, Collections.unmodifiableList(this.msgList));
                if (ackMsg != null && !ackMsg.isEmpty()) {
                    SubscribeService.this.consumer.getConsumer().batchDeleteMsg(SubscribeService.this.queue, ackMsg, new BatchDeleteCallback(){

                        @Override
                        public void onSuccess(BatchDeleteResult deleteResult) {
                            logger.debug("delete msg success, result{}", (Object)deleteResult);
                        }

                        @Override
                        public void onException(Throwable e) {
                            logger.debug("delete msg failed", e);
                        }
                    });
                }
                SubscribeService.this.submitPullRequest();
            }
            catch (Throwable e) {
                logger.warn("consumeMessage exception: {} queue: {}", (Object)RemoteHelper.exceptionSimpleDesc(e), (Object)SubscribeService.this.queue);
            }
        }
    }
}

