/*
 * Decompiled with CFR 0.152.
 */
package com.qcloud.cmq.client.consumer;

import com.qcloud.cmq.client.client.MQClientInstance;
import com.qcloud.cmq.client.client.MQClientManager;
import com.qcloud.cmq.client.common.LogHelper;
import com.qcloud.cmq.client.common.RequestIdHelper;
import com.qcloud.cmq.client.common.ServiceState;
import com.qcloud.cmq.client.consumer.BatchDeleteCallback;
import com.qcloud.cmq.client.consumer.BatchDeleteResult;
import com.qcloud.cmq.client.consumer.BatchReceiveCallback;
import com.qcloud.cmq.client.consumer.BatchReceiveResult;
import com.qcloud.cmq.client.consumer.Consumer;
import com.qcloud.cmq.client.consumer.DeleteCallback;
import com.qcloud.cmq.client.consumer.DeleteResult;
import com.qcloud.cmq.client.consumer.MessageListener;
import com.qcloud.cmq.client.consumer.ReceiveCallback;
import com.qcloud.cmq.client.consumer.ReceiveResult;
import com.qcloud.cmq.client.consumer.SubscribeService;
import com.qcloud.cmq.client.exception.MQClientException;
import com.qcloud.cmq.client.exception.MQServerException;
import com.qcloud.cmq.client.netty.CommunicationMode;
import com.qcloud.cmq.client.netty.RemoteException;
import com.qcloud.cmq.client.protocol.Cmq;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;

public class ConsumerImpl {
    private final Logger logger = LogHelper.getLog();
    private MQClientInstance mQClientInstance;
    private final Consumer consumer;
    private final ConcurrentHashMap<String, List<String>> queueRouteTable = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, SubscribeService> subscribeTable = new ConcurrentHashMap();
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
    private volatile boolean needUpdateRoute = true;

    ConsumerImpl(Consumer consumer) {
        this.consumer = consumer;
    }

    private void makeSureStateOK() throws MQClientException {
        if (this.serviceState != ServiceState.RUNNING) {
            throw new MQClientException("The consumer service state not OK, " + (Object)((Object)this.serviceState), null);
        }
    }

    List<String> getQueueRoute(String queue) throws MQClientException {
        List<String> accessList = this.queueRouteTable.get(queue);
        if (accessList == null || accessList.isEmpty() || this.needUpdateRoute) {
            this.mQClientInstance.updateQueueRoute(queue, this.queueRouteTable);
            accessList = this.queueRouteTable.get(queue);
            this.needUpdateRoute = false;
        }
        return accessList;
    }

    public void setNeedUpdateRoute() {
        this.needUpdateRoute = true;
    }

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST: {
                this.serviceState = ServiceState.START_FAILED;
                this.mQClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.consumer);
                this.mQClientInstance.registerConsumer(this);
                this.mQClientInstance.start();
                this.logger.info("the consumer [{}] start OK", (Object)this.consumer);
                this.serviceState = ServiceState.RUNNING;
                break;
            }
            case RUNNING: 
            case START_FAILED: 
            case SHUTDOWN_ALREADY: {
                throw new MQClientException(10, "The consumer service state not OK, maybe started once, state:" + (Object)((Object)this.serviceState));
            }
        }
    }

    public synchronized void shutdown() {
        switch (this.serviceState) {
            case CREATE_JUST: {
                break;
            }
            case RUNNING: {
                this.mQClientInstance.unRegisterConsumer(this);
                this.mQClientInstance.shutdown();
                this.logger.info("the consumer [{}] shutdown OK", (Object)this.consumer);
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                break;
            }
            case SHUTDOWN_ALREADY: {
                break;
            }
        }
    }

    ReceiveResult receiveMsg(String queue, int pollingWaitSeconds, long timeoutMillis) throws MQClientException, MQServerException {
        return this.receiveImpl(queue, pollingWaitSeconds, timeoutMillis, CommunicationMode.SYNC, null);
    }

    void receiveMsg(String queue, int pollingWaitSeconds, long timeoutMillis, ReceiveCallback callback) throws MQClientException, MQServerException {
        this.receiveImpl(queue, pollingWaitSeconds, timeoutMillis, CommunicationMode.ASYNC, callback);
    }

    private ReceiveResult receiveImpl(String queue, int pollingWaitSeconds, long timeoutMillis, CommunicationMode communicationMode, ReceiveCallback callback) throws MQClientException, MQServerException {
        this.makeSureStateOK();
        if (pollingWaitSeconds < 0) {
            throw new MQClientException("pollingWaitSeconds < 0", null);
        }
        Cmq.cmq_tcp_pull_msg_req.Builder contentBuilder = Cmq.cmq_tcp_pull_msg_req.newBuilder().setQueueName(queue).setPollWaitSeconds(pollingWaitSeconds);
        Cmq.CMQProto request = Cmq.CMQProto.newBuilder().setCmd(1023).setSeqno(RequestIdHelper.getNextSeqNo()).setTcpPullMsg(contentBuilder).build();
        timeoutMillis += (long)(pollingWaitSeconds * 1000);
        List<String> accessList = this.getQueueRoute(queue);
        try {
            return this.mQClientInstance.getCMQClient().receiveMessage(accessList, request, timeoutMillis, communicationMode, callback);
        }
        catch (RemoteException e) {
            this.setNeedUpdateRoute();
            this.logger.error(String.format("receive msg from queue[%s] error", queue), (Throwable)e);
            throw new MQServerException(20, "receive msg with error:" + e.getLocalizedMessage());
        }
        catch (InterruptedException e) {
            this.setNeedUpdateRoute();
            this.logger.error(String.format("receive msg from queue[%s] error", queue), (Throwable)e);
            throw new MQServerException(20, "receive msg with error:" + e.getLocalizedMessage());
        }
    }

    BatchReceiveResult batchReceive(String queue, int maxNums, int pollingWaitSeconds, long timeoutMillis) throws MQClientException, MQServerException {
        return this.batchReceiveImpl(queue, maxNums, pollingWaitSeconds, timeoutMillis, CommunicationMode.SYNC, null);
    }

    void batchReceive(String queue, int maxNums, int pollingWaitSeconds, long timeoutMillis, BatchReceiveCallback callback) throws MQClientException, MQServerException {
        this.batchReceiveImpl(queue, maxNums, pollingWaitSeconds, timeoutMillis, CommunicationMode.ASYNC, callback);
    }

    private BatchReceiveResult batchReceiveImpl(String queue, int maxNums, int pollingWaitSeconds, long timeoutMillis, CommunicationMode communicationMode, BatchReceiveCallback callback) throws MQClientException, MQServerException {
        this.makeSureStateOK();
        if (pollingWaitSeconds < 0) {
            throw new MQClientException("pollingWaitSeconds < 0", null);
        }
        if (maxNums < 0) {
            throw new MQClientException("maxNums < 0", null);
        }
        Cmq.cmq_tcp_batch_pull_msg_req.Builder contentBuilder = Cmq.cmq_tcp_batch_pull_msg_req.newBuilder().setQueueName(queue).setPollWaitSeconds(pollingWaitSeconds).setNumMsg(maxNums);
        timeoutMillis += (long)(pollingWaitSeconds * 1000);
        Cmq.CMQProto request = Cmq.CMQProto.newBuilder().setCmd(1024).setSeqno(RequestIdHelper.getNextSeqNo()).setTcpBatchPullMsg(contentBuilder).build();
        List<String> accessList = this.getQueueRoute(queue);
        try {
            return this.mQClientInstance.getCMQClient().batchReceiveMessage(accessList, request, timeoutMillis, communicationMode, callback);
        }
        catch (RemoteException e) {
            this.setNeedUpdateRoute();
            this.logger.error(String.format("batch receive msg from queue[%s] error", queue), (Throwable)e);
            throw new MQServerException(20, "batch receive msg with error:" + e.getLocalizedMessage());
        }
        catch (InterruptedException e) {
            this.setNeedUpdateRoute();
            this.logger.error(String.format("batch receive msg from queue[%s] error", queue), (Throwable)e);
            throw new MQServerException(20, "batch receive msg with error:" + e.getLocalizedMessage());
        }
    }

    DeleteResult deleteMsg(String queue, long receiptHandle, long timeoutMillis) throws MQClientException, MQServerException {
        return this.deleteImpl(queue, receiptHandle, timeoutMillis, CommunicationMode.SYNC, null);
    }

    void deleteMsg(String queue, long receiptHandle, long timeoutMillis, DeleteCallback callback) throws MQClientException, MQServerException {
        this.deleteImpl(queue, receiptHandle, timeoutMillis, CommunicationMode.ASYNC, callback);
    }

    private DeleteResult deleteImpl(String queue, long receiptHandle, long timeoutMillis, CommunicationMode communicationMode, DeleteCallback callback) throws MQClientException, MQServerException {
        this.makeSureStateOK();
        Cmq.cmq_tcp_delete_msg.Builder contentBuilder = Cmq.cmq_tcp_delete_msg.newBuilder().setQueueName(queue).setReceiptHandle(receiptHandle);
        Cmq.CMQProto request = Cmq.CMQProto.newBuilder().setCmd(1025).setSeqno(RequestIdHelper.getNextSeqNo()).setTcpDeleteMsg(contentBuilder).build();
        List<String> accessList = this.getQueueRoute(queue);
        try {
            return this.mQClientInstance.getCMQClient().deleteMessage(accessList, request, timeoutMillis, communicationMode, callback);
        }
        catch (RemoteException e) {
            this.setNeedUpdateRoute();
            this.logger.error(String.format("delete msg from queue[%s] error", queue), (Throwable)e);
            throw new MQServerException(20, "delete msg with error:" + e.getLocalizedMessage());
        }
        catch (InterruptedException e) {
            this.setNeedUpdateRoute();
            this.logger.error(String.format("delete msg from queue[%s] error", queue), (Throwable)e);
            throw new MQServerException(20, "delete msg with error:" + e.getLocalizedMessage());
        }
    }

    BatchDeleteResult batchDelete(String queue, List<Long> receiptHandleList, long timeoutMillis) throws MQClientException, MQServerException {
        return this.batchDeleteImpl(queue, receiptHandleList, timeoutMillis, CommunicationMode.SYNC, null);
    }

    void batchDelete(String queue, List<Long> receiptHandleList, long timeoutMillis, BatchDeleteCallback callback) throws MQClientException, MQServerException {
        this.batchDeleteImpl(queue, receiptHandleList, timeoutMillis, CommunicationMode.ASYNC, callback);
    }

    private BatchDeleteResult batchDeleteImpl(String queue, List<Long> receiptHandleList, long timeoutMillis, CommunicationMode communicationMode, BatchDeleteCallback callback) throws MQClientException, MQServerException {
        this.makeSureStateOK();
        Cmq.cmq_tcp_batch_delete_msg.Builder contentBuilder = Cmq.cmq_tcp_batch_delete_msg.newBuilder().setQueueName(queue).addAllReceiptHandleList(receiptHandleList);
        Cmq.CMQProto request = Cmq.CMQProto.newBuilder().setCmd(1026).setSeqno(RequestIdHelper.getNextSeqNo()).setTcpBatchDeleteMsg(contentBuilder).build();
        List<String> accessList = this.getQueueRoute(queue);
        try {
            return this.mQClientInstance.getCMQClient().batchDeleteMessage(accessList, request, timeoutMillis, communicationMode, callback);
        }
        catch (RemoteException e) {
            this.setNeedUpdateRoute();
            this.logger.error(String.format("batch delete msg from queue[%s] error", queue), (Throwable)e);
            throw new MQServerException(20, "batch delete msg with error:" + e.getLocalizedMessage());
        }
        catch (InterruptedException e) {
            this.setNeedUpdateRoute();
            this.logger.error(String.format("batch delete msg from queue[%s] error", queue), (Throwable)e);
            throw new MQServerException(20, "batch delete msg with error:" + e.getLocalizedMessage());
        }
    }

    void subscriber(String queue, MessageListener listener) throws MQClientException, MQServerException {
        this.makeSureStateOK();
        Cmq.cmq_tcp_batch_pull_msg_req.Builder contentBuilder = Cmq.cmq_tcp_batch_pull_msg_req.newBuilder().setQueueName(queue).setPollWaitSeconds(this.consumer.getPollingWaitSeconds()).setNumMsg(this.consumer.getBatchPullNumber());
        Cmq.CMQProto.Builder builder = Cmq.CMQProto.newBuilder().setCmd(1024).setSeqno(RequestIdHelper.getNextSeqNo()).setTcpBatchPullMsg(contentBuilder);
        SubscribeService pullMessageService = new SubscribeService(queue, listener, builder, this);
        if (this.subscribeTable.putIfAbsent(queue, pullMessageService) != null) {
            this.logger.error("queue[%s] already subscribed.", (Object)queue);
            throw new MQClientException(11, "queue[" + queue + "] already subscribed.");
        }
        pullMessageService.start();
        this.logger.info("subscribe queue {} success.", (Object)queue);
    }

    void unSubscriber(String queue) throws MQClientException {
        this.makeSureStateOK();
        SubscribeService pullMessageService = this.subscribeTable.remove(queue);
        if (pullMessageService != null) {
            pullMessageService.shutdown();
        }
        this.logger.info("unSubscribe queue {} success.", (Object)queue);
    }

    MQClientInstance getMQClientInstance() {
        return this.mQClientInstance;
    }

    Consumer getConsumer() {
        return this.consumer;
    }
}

