/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.tubemq.client.consumer;

import com.google.protobuf.ProtocolStringList;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.security.Security;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.inlong.tubemq.client.common.ClientStatsInfo;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.consumer.ClientSubInfo;
import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
import org.apache.inlong.tubemq.client.consumer.ConsumerSamplePrint;
import org.apache.inlong.tubemq.client.consumer.FetchContext;
import org.apache.inlong.tubemq.client.consumer.MessageConsumer;
import org.apache.inlong.tubemq.client.consumer.MessageListener;
import org.apache.inlong.tubemq.client.consumer.PartitionSelectResult;
import org.apache.inlong.tubemq.client.consumer.RmtDataCache;
import org.apache.inlong.tubemq.client.consumer.TopicProcessor;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.InnerSessionFactory;
import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.aaaclient.ClientAuthenticateHandler;
import org.apache.inlong.tubemq.corebase.aaaclient.SimpleClientAuthenticateHandler;
import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
import org.apache.inlong.tubemq.corebase.balance.EventStatus;
import org.apache.inlong.tubemq.corebase.balance.EventType;
import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.cluster.SubscribeInfo;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
import org.apache.inlong.tubemq.corebase.utils.AddressUtils;
import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.apache.inlong.tubemq.corerpc.RpcConfig;
import org.apache.inlong.tubemq.corerpc.RpcServiceFactory;
import org.apache.inlong.tubemq.corerpc.service.BrokerReadService;
import org.apache.inlong.tubemq.corerpc.service.MasterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseMessageConsumer
implements MessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(BaseMessageConsumer.class);
    private static final int REBALANCE_QUEUE_SIZE = 5000;
    private static final AtomicInteger consumerCounter = new AtomicInteger(0);
    protected final String consumerId;
    protected final ConsumerConfig consumerConfig;
    protected final RmtDataCache rmtDataCache;
    protected final ClientSubInfo consumeSubInfo = new ClientSubInfo();
    private final boolean isPullConsume;
    private final InnerSessionFactory sessionFactory;
    private final RpcServiceFactory rpcServiceFactory;
    private final MasterService masterService;
    private final ScheduledExecutorService heartService2Master;
    private final Thread rebalanceThread;
    private final BlockingQueue<ConsumerEvent> rebalanceEvents = new ArrayBlockingQueue<ConsumerEvent>(5000);
    private final BlockingQueue<ConsumerEvent> rebalanceResults = new ArrayBlockingQueue<ConsumerEvent>(5000);
    private final ConsumerSamplePrint samplePrintCtrl = new ConsumerSamplePrint();
    private final RpcConfig rpcConfig = new RpcConfig();
    private final AtomicLong visitToken = new AtomicLong(-2L);
    private final AtomicReference<String> authAuthorizedTokenRef = new AtomicReference<String>("");
    private final ClientAuthenticateHandler authenticateHandler = new SimpleClientAuthenticateHandler();
    private Thread heartBeatThread2Broker;
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final AtomicBoolean isRebalanceStopped = new AtomicBoolean(false);
    private final AtomicBoolean isFirst = new AtomicBoolean(true);
    private int heartbeatRetryTimes = 0;
    private final AtomicInteger subStatus = new AtomicInteger(-1);
    private int reportIntervalTimes = 0;
    private int rebalanceRetryTimes = 0;
    private long lastHeartbeatTime2Master = 0L;
    private long lastHeartbeatTime2Broker = 0L;
    protected final ClientStatsInfo clientStatsInfo;

    public BaseMessageConsumer(InnerSessionFactory sessionFactory, ConsumerConfig consumerConfig, boolean isPullConsume) throws TubeClientException {
        Security.setProperty("networkaddress.cache.ttl", "3");
        Security.setProperty("networkaddress.cache.negative.ttl", "1");
        if (sessionFactory == null || consumerConfig == null) {
            throw new TubeClientException("Illegal parameter: messageSessionFactory or consumerConfig is null!");
        }
        this.sessionFactory = sessionFactory;
        this.consumerConfig = consumerConfig;
        this.isPullConsume = isPullConsume;
        try {
            this.consumerId = this.generateConsumerID();
        }
        catch (Exception e) {
            throw new TubeClientException("Get consumer id failed!", e);
        }
        this.clientStatsInfo = new ClientStatsInfo(false, this.consumerId, this.consumerConfig.getStatsConfig());
        this.rmtDataCache = new RmtDataCache(this.consumerConfig, null);
        this.rpcServiceFactory = this.sessionFactory.getRpcServiceFactory();
        this.rpcConfig.put("rpc.connect.timeout", (Object)3000);
        this.rpcConfig.put("rpc.request.timeout", (Object)this.consumerConfig.getRpcTimeoutMs());
        this.rpcConfig.put("rpc.netty.worker.thread.name", (Object)"tube_consumer_netty_worker-");
        this.rpcConfig.put("rpc.netty.callback.count", (Object)this.consumerConfig.getRpcRspCallBackThreadCnt());
        this.masterService = (MasterService)this.rpcServiceFactory.getFailoverService(MasterService.class, this.consumerConfig.getMasterInfo(), this.rpcConfig);
        this.heartService2Master = Executors.newScheduledThreadPool(1, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, new StringBuilder(512).append("Master-Heartbeat-Thread-").append(BaseMessageConsumer.this.consumerId).toString());
                t.setPriority(10);
                return t;
            }
        });
        this.rebalanceThread = new Thread(new Runnable(){

            @Override
            public void run() {
                StringBuilder strBuffer = new StringBuilder(256);
                while (!BaseMessageConsumer.this.isRebalanceStopped() || !BaseMessageConsumer.this.isShutdown()) {
                    try {
                        ConsumerEvent event = (ConsumerEvent)BaseMessageConsumer.this.rebalanceEvents.take();
                        BaseMessageConsumer.this.rebalanceEvents.clear();
                        if (BaseMessageConsumer.this.isRebalanceStopped() || BaseMessageConsumer.this.isShutdown()) break;
                        switch (event.getType()) {
                            case DISCONNECT: 
                            case ONLY_DISCONNECT: {
                                BaseMessageConsumer.this.disconnectFromBroker(event);
                                BaseMessageConsumer.this.rebalanceResults.put(event);
                                break;
                            }
                            case CONNECT: 
                            case ONLY_CONNECT: {
                                BaseMessageConsumer.this.connect2Broker(event);
                                BaseMessageConsumer.this.rebalanceResults.put(event);
                                break;
                            }
                            case REPORT: {
                                BaseMessageConsumer.this.reportSubscribeInfo();
                                break;
                            }
                            case STOPREBALANCE: {
                                break;
                            }
                            default: {
                                throw new TubeClientException(strBuffer.append("Invalid rebalance opCode:").append(event.getType()).toString());
                            }
                        }
                        BaseMessageConsumer.this.rebalanceRetryTimes = 0;
                    }
                    catch (InterruptedException e) {
                        return;
                    }
                    catch (Throwable e) {
                        BaseMessageConsumer.this.rebalanceRetryTimes++;
                        if (BaseMessageConsumer.this.isShutdown()) continue;
                        strBuffer.delete(0, strBuffer.length());
                        logger.error(strBuffer.append("Rebalance retry ").append(BaseMessageConsumer.this.rebalanceRetryTimes).append(" failed.").toString(), e);
                        strBuffer.delete(0, strBuffer.length());
                    }
                }
            }
        }, new StringBuilder(512).append("Rebalance-Thread-").append(this.consumerId).toString());
        this.rebalanceThread.setPriority(10);
    }

    protected MessageConsumer subscribe(String topic, TreeSet<String> filterConds, MessageListener messageListener) throws TubeClientException {
        this.checkClientRunning();
        if (TStringUtils.isBlank((String)topic)) {
            throw new TubeClientException("Parameter error: topic is Blank!");
        }
        if (filterConds != null && !filterConds.isEmpty()) {
            if (filterConds.size() > 500) {
                throw new TubeClientException(new StringBuilder(256).append("Parameter error: Over max allowed filter count, allowed count is ").append(500).toString());
            }
            for (String filter : filterConds) {
                if (TStringUtils.isBlank((String)filter)) {
                    throw new TubeClientException("Parameter error: blank filter value in parameter filterConds!");
                }
                if (filter.length() <= 256) continue;
                throw new TubeClientException(new StringBuilder(256).append("Parameter error: over max allowed filter length, allowed length is ").append(256).toString());
            }
        }
        if (messageListener == null && !this.isPullConsume) {
            throw new IllegalArgumentException("Parameter error: null messageListener");
        }
        TopicProcessor topicProcessor = this.consumeSubInfo.getTopicProcessor(topic);
        if (topicProcessor == null) {
            TopicProcessor oldProcessor = this.consumeSubInfo.putIfAbsentTopicProcessor(topic, new TopicProcessor(messageListener, filterConds));
            if (oldProcessor != null) {
                throw new TubeClientException(new StringBuilder(256).append("Topic=").append(topic).append(" has been subscribed").toString());
            }
            return this;
        }
        throw new TubeClientException(new StringBuilder(256).append("Topic=").append(topic).append(" has been subscribed").toString());
    }

    @Override
    public void completeSubscribe() throws TubeClientException {
        this.checkClientRunning();
        if (this.consumeSubInfo.isSubscribedTopicEmpty()) {
            throw new TubeClientException("Not subscribe any topic, please subscribe first!");
        }
        if (this.subStatus.get() >= 0) {
            if (this.subStatus.get() == 0) {
                throw new TubeClientException("Duplicated completeSubscribe call!");
            }
            throw new TubeClientException("Subscribe has finished!");
        }
        if (!this.subStatus.compareAndSet(-1, 0)) {
            throw new TubeClientException("Duplicated completeSubscribe call!");
        }
        this.consumeSubInfo.setNotRequireBound();
        this.startMasterAndBrokerThreads();
        this.subStatus.set(1);
    }

    @Override
    public void completeSubscribe(String sessionKey, int sourceCount, boolean isSelectBig, Map<String, Long> partOffsetMap) throws TubeClientException {
        this.checkClientRunning();
        if (this.consumeSubInfo.isSubscribedTopicEmpty()) {
            throw new TubeClientException("Not subscribe any topic, please subscribe first!");
        }
        if (partOffsetMap != null) {
            if (TStringUtils.isBlank((String)sessionKey)) {
                throw new TubeClientException("Parameter error: sessionKey is Blank!");
            }
            if (sourceCount <= 0) {
                throw new TubeClientException("Parameter error: sourceCount must over zero!");
            }
            StringBuilder sBuilder = new StringBuilder(256);
            for (Map.Entry<String, Long> entry : partOffsetMap.entrySet()) {
                if (entry.getKey() == null) continue;
                this.validPartitionKey(sBuilder, entry.getKey());
                if (entry.getValue() == null || entry.getValue() >= 0L) continue;
                throw new TubeClientException(sBuilder.append("Parameter error: Offset must over or equal zero of partOffsetMap  key ").append(entry.getKey()).append(", value is ").append(entry.getValue()).toString());
            }
        }
        if (this.subStatus.get() >= 0) {
            if (this.subStatus.get() == 0) {
                throw new TubeClientException("Duplicated completeSubscribe call!");
            }
            throw new TubeClientException("Subscribe has finished!");
        }
        if (!this.subStatus.compareAndSet(-1, 0)) {
            throw new TubeClientException("Duplicated completeSubscribe call!");
        }
        if (partOffsetMap == null) {
            this.consumeSubInfo.setNotRequireBound();
        } else {
            this.consumeSubInfo.setRequireBound(sessionKey, sourceCount, isSelectBig, partOffsetMap);
        }
        this.startMasterAndBrokerThreads();
        this.subStatus.set(1);
    }

    @Override
    public boolean isFilterConsume(String topic) {
        return this.consumeSubInfo.isFilterConsume(topic);
    }

    @Override
    public boolean isShutdown() {
        return this.isShutdown.get();
    }

    @Override
    public String getConsumerId() {
        return this.consumerId;
    }

    @Override
    public String getClientVersion() {
        return "1.7.0";
    }

    public void shutdown() throws Throwable {
        StringBuilder strBuffer = new StringBuilder(256);
        if (this.isShutdown()) {
            logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] ").append(this.consumerId).append(" was already shutdown, do nothing...").toString());
            return;
        }
        if (this.isRebalanceStopped()) {
            logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] ").append(this.consumerId).append(" is shutting down, do nothing...").toString());
            return;
        }
        logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] Shutting down consumer:").append(this.consumerId).toString());
        strBuffer.delete(0, strBuffer.length());
        if (!this.isRebalanceStopped.compareAndSet(false, true)) {
            return;
        }
        this.rebalanceEvents.put(new ConsumerEvent(-2L, EventType.STOPREBALANCE, null, EventStatus.TODO));
        long startWaitTime = System.currentTimeMillis();
        do {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                break;
            }
        } while (this.rmtDataCache.isRebProcessing() && System.currentTimeMillis() - startWaitTime < this.consumerConfig.getShutDownRebalanceWaitPeriodMs());
        if (this.rebalanceThread != null) {
            try {
                this.rebalanceThread.interrupt();
            }
            catch (Throwable e) {
                // empty catch block
            }
        }
        logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] Partition rebalance stopped, consumer:").append(this.consumerId).toString());
        strBuffer.delete(0, strBuffer.length());
        this.rmtDataCache.close();
        Map<BrokerInfo, List<PartitionSelectResult>> unRegisterInfoMap = this.rmtDataCache.getAllPartitionListWithStatus();
        this.unregisterPartitions(unRegisterInfoMap);
        this.isShutdown.set(true);
        this.sessionFactory.removeClient(this);
        if (this.heartService2Master != null) {
            try {
                this.heartService2Master.shutdownNow();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
        if (this.heartBeatThread2Broker != null) {
            try {
                this.heartBeatThread2Broker.interrupt();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
        this.clientStatsInfo.selfPrintStatsInfo(true, true, strBuffer);
        logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] Partitions unregistered,  consumer :").append(this.consumerId).toString());
        strBuffer.delete(0, strBuffer.length());
        try {
            this.masterService.consumerCloseClientC2M(this.createMasterCloseRequest(), AddressUtils.getLocalAddress(), this.consumerConfig.isTlsEnable());
        }
        catch (Throwable e) {
            strBuffer.delete(0, strBuffer.length());
            logger.warn(strBuffer.append("[SHUTDOWN_CONSUMER] call closeRequest failure, error is ").append(e.getMessage()).toString());
            strBuffer.delete(0, strBuffer.length());
        }
        logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] Client closed, consumer : ").append(this.consumerId).toString());
    }

    @Override
    public ConsumerConfig getConsumerConfig() {
        return this.consumerConfig;
    }

    @Override
    public Map<String, ConsumeOffsetInfo> getCurConsumedPartitions() {
        return this.rmtDataCache.getCurPartitionInfoMap();
    }

    @Override
    public void freezePartitions(List<String> partitionKeys) throws TubeClientException {
        this.freezeOrUnFreezeParts(partitionKeys, true);
    }

    @Override
    public void unfreezePartitions(List<String> partitionKeys) throws TubeClientException {
        this.freezeOrUnFreezeParts(partitionKeys, false);
    }

    @Override
    public void relAllFrozenPartitions() {
        this.rmtDataCache.relAllFrozenPartitions();
    }

    @Override
    public Map<String, Long> getFrozenPartInfo() {
        return this.rmtDataCache.getFrozenPartInfo();
    }

    private void freezeOrUnFreezeParts(List<String> partitionKeys, boolean isFreeze) throws TubeClientException {
        if (partitionKeys == null || partitionKeys.isEmpty()) {
            return;
        }
        StringBuilder sBuilder = new StringBuilder(256);
        ArrayList<String> validPartKeys = new ArrayList<String>();
        for (String partKey : partitionKeys) {
            String tmpKey = this.validPartitionKey(sBuilder, partKey);
            validPartKeys.add(tmpKey);
        }
        this.rmtDataCache.freezeOrUnFreezeParts(validPartKeys, isFreeze);
    }

    private String validPartitionKey(StringBuilder sBuilder, String partitionKey) throws TubeClientException {
        if (partitionKey == null) {
            throw new TubeClientException(sBuilder.append("Parameter error: partitionKey is null!").toString());
        }
        String[] keyItems = partitionKey.split(":");
        if (keyItems.length != 3) {
            throw new TubeClientException(sBuilder.append("Parameter error: partitionKey ").append(partitionKey).append(" format error: value must be aaaa:bbbb:cccc !").toString());
        }
        if (!this.consumeSubInfo.isSubscribedTopicContain(keyItems[1].trim())) {
            throw new TubeClientException(sBuilder.append("Parameter error: not included in subcribed topic list: ").append("partitionKey is ").append(partitionKey).append(", subscribed topics are ").append(this.consumeSubInfo.getSubscribedTopics().toString()).toString());
        }
        if (partitionKey.contains(",")) {
            throw new TubeClientException(sBuilder.append("Parameter error: illegal format error of ").append(partitionKey).append(" : value must not include ',' char!").toString());
        }
        String tmpKey = sBuilder.append(keyItems[0].trim()).append(":").append(keyItems[1].trim()).append(":").append(keyItems[2].trim()).toString();
        sBuilder.delete(0, sBuilder.length());
        return tmpKey;
    }

    private boolean isRebalanceStopped() {
        return this.isRebalanceStopped.get();
    }

    private String generateConsumerID() throws Exception {
        String pidName = ManagementFactory.getRuntimeMXBean().getName();
        if (pidName != null && pidName.contains("@")) {
            pidName = pidName.split("@")[0];
        }
        StringBuilder strBuffer = new StringBuilder(256).append(this.consumerConfig.getConsumerGroup()).append("_").append(AddressUtils.getLocalAddress()).append("-").append(pidName).append("-").append(System.currentTimeMillis()).append("-").append(consumerCounter.incrementAndGet());
        if (this.isPullConsume) {
            strBuffer.append("-Pull-");
        } else {
            strBuffer.append("-Push-");
        }
        return strBuffer.append("1.7.0").toString();
    }

    private void startMasterAndBrokerThreads() throws TubeClientException {
        int registerRetryTimes = 0;
        StringBuilder strBuffer = new StringBuilder(256);
        while (registerRetryTimes < this.consumerConfig.getMaxRegisterRetryTimes()) {
            try {
                ClientMaster.RegisterResponseM2C response = this.masterService.consumerRegisterC2M(this.createMasterRegisterRequest(), AddressUtils.getLocalAddress(), this.consumerConfig.isTlsEnable());
                if (response != null && response.getSuccess()) {
                    this.processRegisterAllocAndRspFlowRules(response, strBuffer);
                    this.processRegAuthorizedToken(response);
                    break;
                }
                if (response == null) {
                    logger.warn(strBuffer.append("[Register Failed] ").append("response return null!").toString());
                } else {
                    if (response.getErrCode() == 450) {
                        throw new TubeClientException(strBuffer.append("Register to master failed! ConsumeGroup forbidden, ").append(response.getErrMsg()).toString());
                    }
                    if (response.getErrCode() == 455) {
                        throw new TubeClientException(strBuffer.append("Register to master failed! Restricted consume content, ").append(response.getErrMsg()).toString());
                    }
                    logger.warn(strBuffer.append("[Register Failed] ").append(response.getErrMsg()).toString());
                }
                strBuffer.delete(0, strBuffer.length());
            }
            catch (Throwable e) {
                logger.warn("Register to master failed.", e);
                ThreadUtils.sleep((long)this.consumerConfig.getRegFailWaitPeriodMs());
            }
            if (++registerRetryTimes < this.consumerConfig.getMaxRegisterRetryTimes()) continue;
            this.subStatus.compareAndSet(0, -1);
            logger.error("Register to master failed! please check and retry later.");
            throw new TubeClientException("Register to master failed! please check and retry later.");
        }
        this.lastHeartbeatTime2Master = System.currentTimeMillis();
        this.heartService2Master.scheduleWithFixedDelay(new HeartTask2MasterWorker(), 0L, this.consumerConfig.getHeartbeatPeriodMs(), TimeUnit.MILLISECONDS);
        this.lastHeartbeatTime2Broker = System.currentTimeMillis();
        this.heartBeatThread2Broker = new Thread(new HeartTask2BrokerWorker());
        this.heartBeatThread2Broker.setName(strBuffer.append("Broker-Heartbeat-Thread-").append(this.consumerId).toString());
        this.heartBeatThread2Broker.setPriority(10);
        this.heartBeatThread2Broker.start();
        this.rebalanceThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disconnectFromBroker(ConsumerEvent event) throws InterruptedException {
        ArrayList<String> partKeys = new ArrayList<String>();
        HashMap<BrokerInfo, List<Partition>> unRegisterInfoMap = new HashMap<BrokerInfo, List<Partition>>();
        List subscribeInfoList = event.getSubscribeInfoList();
        for (SubscribeInfo info : subscribeInfoList) {
            BrokerInfo broker = new BrokerInfo(info.getBrokerId(), info.getHost(), info.getPort());
            Partition partition = new Partition(broker, info.getTopic(), info.getPartitionId());
            List<Partition> unRegisterPartitionList = unRegisterInfoMap.get(broker);
            if (unRegisterPartitionList == null) {
                unRegisterPartitionList = new ArrayList<Partition>();
                unRegisterInfoMap.put(broker, unRegisterPartitionList);
            }
            if (unRegisterPartitionList.contains(partition)) continue;
            unRegisterPartitionList.add(partition);
            partKeys.add(partition.getPartitionKey());
        }
        if (this.isShutdown() || this.isRebalanceStopped()) {
            return;
        }
        Map<BrokerInfo, List<PartitionSelectResult>> unNewRegisterInfoMap = new HashMap<BrokerInfo, List<PartitionSelectResult>>();
        try {
            unNewRegisterInfoMap = this.isPullConsume ? this.rmtDataCache.removeAndGetPartition(unRegisterInfoMap, partKeys, this.consumerConfig.getPullRebConfirmWaitPeriodMs(), this.consumerConfig.isPullRebConfirmTimeoutRollBack()) : this.rmtDataCache.removeAndGetPartition(unRegisterInfoMap, partKeys, this.consumerConfig.getPushListenerWaitPeriodMs(), this.consumerConfig.isPushListenerWaitTimeoutRollBack());
        }
        finally {
            this.unregisterPartitions(unNewRegisterInfoMap);
            event.setStatus(EventStatus.DONE);
        }
    }

    private void connect2Broker(ConsumerEvent event) throws InterruptedException {
        HashMap<BrokerInfo, List<Partition>> registerInfoMap = new HashMap<BrokerInfo, List<Partition>>();
        List subscribeInfoList = event.getSubscribeInfoList();
        for (SubscribeInfo info : subscribeInfoList) {
            BrokerInfo broker = new BrokerInfo(info.getBrokerId(), info.getHost(), info.getPort());
            Partition partition = new Partition(broker, info.getTopic(), info.getPartitionId());
            List curPartList = registerInfoMap.computeIfAbsent(broker, k -> new ArrayList());
            if (curPartList.contains(partition)) continue;
            curPartList.add(partition);
        }
        if (this.isRebalanceStopped() || this.isShutdown()) {
            return;
        }
        ArrayList<Partition> unfinishedPartitions = new ArrayList<Partition>();
        this.rmtDataCache.filterCachedPartitionInfo(registerInfoMap, unfinishedPartitions);
        this.registerPartitions(registerInfoMap, unfinishedPartitions);
        if (this.isFirst.get()) {
            this.isFirst.set(false);
        }
        event.setStatus(EventStatus.DONE);
    }

    private void reportSubscribeInfo() {
    }

    protected boolean removePartition(Partition part) {
        this.rmtDataCache.removePartition(part);
        return true;
    }

    protected PartitionSelectResult pushSelectPartition() {
        return this.rmtDataCache.pushSelect();
    }

    protected void pushReqReleasePartition(String partitionKey, long usedTime, boolean isLastPackConsumed) {
        this.rmtDataCache.errReqRelease(partitionKey, usedTime, isLastPackConsumed);
    }

    protected ClientBroker.GetMessageRequestC2B createBrokerGetMessageRequest(Partition partition, boolean isLastConsumed) {
        ClientBroker.GetMessageRequestC2B.Builder builder = ClientBroker.GetMessageRequestC2B.newBuilder();
        builder.setClientId(this.consumerId);
        builder.setGroupName(this.consumerConfig.getConsumerGroup());
        builder.setTopicName(partition.getTopic());
        builder.setEscFlowCtrl(this.rmtDataCache.isCurGroupInFlowCtrl());
        builder.setPartitionId(partition.getPartitionId());
        builder.setLastPackConsumed(isLastConsumed);
        builder.setManualCommitOffset(false);
        return builder.build();
    }

    protected ClientBroker.CommitOffsetRequestC2B createBrokerCommitRequest(Partition partition, boolean isConsumed) {
        ClientBroker.CommitOffsetRequestC2B.Builder builder = ClientBroker.CommitOffsetRequestC2B.newBuilder();
        builder.setClientId(this.consumerId);
        builder.setGroupName(this.consumerConfig.getConsumerGroup());
        builder.setTopicName(partition.getTopic());
        builder.setPartitionId(partition.getPartitionId());
        builder.setLastPackConsumed(isConsumed);
        return builder.build();
    }

    private void registerPartitions(Map<BrokerInfo, List<Partition>> registerInfoMap, List<Partition> unRegPartitions) throws InterruptedException {
        StringBuilder strBuffer = new StringBuilder(512);
        for (int retryTimesRegister2Broker = 0; !unRegPartitions.isEmpty() && retryTimesRegister2Broker < this.consumerConfig.getMaxRegisterRetryTimes(); ++retryTimesRegister2Broker) {
            for (Map.Entry entry : registerInfoMap.entrySet()) {
                ConcurrentLinkedQueue<Partition> regedPartitions = this.rmtDataCache.getPartitionByBroker((BrokerInfo)entry.getKey());
                for (Partition partition : (List)entry.getValue()) {
                    if (this.isRebalanceStopped() || this.isShutdown()) {
                        return;
                    }
                    try {
                        if (regedPartitions != null && regedPartitions.contains(partition)) {
                            unRegPartitions.remove(partition);
                            continue;
                        }
                        ClientBroker.RegisterResponseB2C responseB2C = this.getBrokerService(partition.getBroker()).consumerRegisterC2B(this.createBrokerRegisterRequest(partition), AddressUtils.getLocalAddress(), this.consumerConfig.isTlsEnable());
                        if (responseB2C != null && responseB2C.getSuccess()) {
                            this.clientStatsInfo.bookReg2Broker(false);
                            long currOffset = responseB2C.hasCurrOffset() ? responseB2C.getCurrOffset() : -2L;
                            long maxOffset = responseB2C.hasMaxOffset() ? responseB2C.getMaxOffset() : -2L;
                            this.rmtDataCache.addPartition(partition, currOffset, maxOffset);
                            unRegPartitions.remove(partition);
                            logger.info(strBuffer.append("Registered partition: consumer is ").append(this.consumerId).append(", partition is:").append(partition.toString()).toString());
                            strBuffer.delete(0, strBuffer.length());
                            continue;
                        }
                        this.clientStatsInfo.bookReg2Broker(true);
                        if (responseB2C == null) {
                            logger.warn(strBuffer.append("register2broker error! ").append(retryTimesRegister2Broker).append(" register ").append(partition.toString()).append(" return null!").toString());
                        } else if (responseB2C.getErrCode() == 410 || responseB2C.getErrCode() == 415) {
                            unRegPartitions.remove(partition);
                            if (responseB2C.getErrCode() == 410) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug(strBuffer.append("[Partition occupied], curr consumerId: ").append(this.consumerId).append(", returned message : ").append(responseB2C.getErrMsg()).toString());
                                }
                            } else {
                                logger.warn(strBuffer.append("[Certificate failure], curr consumerId: ").append(this.consumerId).append(", returned message : ").append(responseB2C.getErrMsg()).toString());
                            }
                        } else {
                            logger.warn(strBuffer.append("register2broker error! ").append(retryTimesRegister2Broker).append(" register ").append(partition.toString()).append(" return ").append(responseB2C.getErrMsg()).toString());
                        }
                        strBuffer.delete(0, strBuffer.length());
                    }
                    catch (IOException e) {
                        strBuffer.delete(0, strBuffer.length());
                        logger.warn(strBuffer.append("register2broker error1 ! ").append(retryTimesRegister2Broker).append(" ").append(partition.toString()).toString(), (Throwable)e);
                        strBuffer.delete(0, strBuffer.length());
                    }
                    catch (Throwable ee) {
                        strBuffer.delete(0, strBuffer.length());
                        logger.warn(strBuffer.append("register2broker error2 ! ").append(retryTimesRegister2Broker).append(" ").append(partition.toString()).toString(), ee);
                        strBuffer.delete(0, strBuffer.length());
                    }
                }
            }
            Thread.sleep(1000L);
        }
        for (Partition partition : unRegPartitions) {
            boolean result = this.removePartition(partition);
            logger.info(strBuffer.append("[Remove Partition] ").append(partition.toString()).append(" ").append(result).toString());
            strBuffer.delete(0, strBuffer.length());
        }
    }

    private void unregisterPartitions(Map<BrokerInfo, List<PartitionSelectResult>> unRegisterInfoMap) {
        StringBuilder strBuffer = new StringBuilder(512);
        strBuffer.append("Unregister info:");
        for (Map.Entry<BrokerInfo, List<PartitionSelectResult>> entry : unRegisterInfoMap.entrySet()) {
            for (PartitionSelectResult partResult : entry.getValue()) {
                try {
                    this.getBrokerService(partResult.getPartition().getBroker()).consumerRegisterC2B(this.createBrokerUnregisterRequest(partResult.getPartition(), partResult.isLastPackConsumed()), AddressUtils.getLocalAddress(), this.consumerConfig.isTlsEnable());
                }
                catch (Throwable e) {
                    logger.error(new StringBuilder(512).append("Disconnect to Broker error! broker:").append(partResult.getPartition().getBroker().toString()).toString(), e);
                }
                strBuffer.append(partResult.getPartition().toString());
                strBuffer.append("\n");
            }
        }
        logger.info(strBuffer.toString());
    }

    private ClientMaster.RegisterRequestC2M createMasterRegisterRequest() throws Exception {
        ClientMaster.MasterCertificateInfo authInfo;
        ClientMaster.RegisterRequestC2M.Builder builder = ClientMaster.RegisterRequestC2M.newBuilder();
        builder.setClientId(this.consumerId);
        builder.setHostName(AddressUtils.getLocalAddress());
        builder.setJdkVersion(MixedUtils.getJavaVersion());
        builder.setRequireBound(this.consumeSubInfo.isRequireBound());
        builder.setGroupName(this.consumerConfig.getConsumerGroup());
        builder.setSessionTime(this.consumeSubInfo.getSubscribedTime());
        builder.addAllTopicList(this.consumeSubInfo.getSubscribedTopics());
        builder.setDefFlowCheckId(this.rmtDataCache.getDefFlowCtrlId());
        builder.setQryPriorityId(this.rmtDataCache.getQryPriorityId());
        builder.setGroupFlowCheckId(this.rmtDataCache.getGroupFlowCtrlId());
        List<SubscribeInfo> subInfoList = this.rmtDataCache.getSubscribeInfoList(this.consumerId, this.consumerConfig.getConsumerGroup());
        if (subInfoList != null) {
            builder.addAllSubscribeInfo((Iterable)DataConverterUtil.formatSubInfo(subInfoList));
        }
        builder.addAllTopicCondition(this.formatTopicCondInfo(this.consumeSubInfo.getTopicCondRegistry()));
        if (this.consumeSubInfo.isRequireBound()) {
            builder.setSessionKey(this.consumeSubInfo.getSessionKey());
            builder.setSelectBig(this.consumeSubInfo.isSelectBig());
            builder.setTotalCount(this.consumeSubInfo.getSourceCount());
            builder.setRequiredPartition(this.consumeSubInfo.getRequiredPartition());
            builder.setNotAllocated(this.consumeSubInfo.getIsNotAllocated());
        }
        if ((authInfo = this.genMasterCertificateInfo(true)) != null) {
            builder.setAuthInfo(authInfo);
        }
        return builder.build();
    }

    private List<String> formatTopicCondInfo(ConcurrentHashMap<String, TopicProcessor> topicCondMap) {
        StringBuilder strBuffer = new StringBuilder(512);
        ArrayList<String> strTopicCondList = new ArrayList<String>();
        if (topicCondMap != null && !topicCondMap.isEmpty()) {
            for (Map.Entry<String, TopicProcessor> entry : topicCondMap.entrySet()) {
                Set<String> condSet;
                if (entry.getKey() == null || entry.getValue() == null || (condSet = entry.getValue().getFilterConds()) == null || condSet.isEmpty()) continue;
                int i = 0;
                strBuffer.append(entry.getKey()).append("#");
                for (String condStr : condSet) {
                    if (i++ > 0) {
                        strBuffer.append(",");
                    }
                    strBuffer.append(condStr);
                }
                strTopicCondList.add(strBuffer.toString());
                strBuffer.delete(0, strBuffer.length());
            }
        }
        return strTopicCondList;
    }

    private ClientMaster.HeartRequestC2M createMasterHeartbeatRequest(ConsumerEvent event, List<SubscribeInfo> subInfoList, boolean reportSubscribeInfo) throws Exception {
        ClientMaster.MasterCertificateInfo authInfo;
        ClientMaster.HeartRequestC2M.Builder builder = ClientMaster.HeartRequestC2M.newBuilder();
        builder.setClientId(this.consumerId);
        builder.setGroupName(this.consumerConfig.getConsumerGroup());
        builder.setReportSubscribeInfo(reportSubscribeInfo);
        builder.setDefFlowCheckId(this.rmtDataCache.getDefFlowCtrlId());
        builder.setQryPriorityId(this.rmtDataCache.getQryPriorityId());
        builder.setGroupFlowCheckId(this.rmtDataCache.getGroupFlowCtrlId());
        if (event != null) {
            ClientMaster.EventProto.Builder eventProtoBuilder = ClientMaster.EventProto.newBuilder();
            eventProtoBuilder.setRebalanceId(event.getRebalanceId());
            eventProtoBuilder.setOpType(event.getType().getValue());
            eventProtoBuilder.setStatus(event.getStatus().getValue());
            eventProtoBuilder.addAllSubscribeInfo((Iterable)DataConverterUtil.formatSubInfo((List)event.getSubscribeInfoList()));
            ClientMaster.EventProto eventProto = eventProtoBuilder.build();
            builder.setEvent(eventProto);
        }
        if (subInfoList != null) {
            builder.addAllSubscribeInfo((Iterable)DataConverterUtil.formatSubInfo(subInfoList));
        }
        if ((authInfo = this.genMasterCertificateInfo(false)) != null) {
            builder.setAuthInfo(authInfo);
        }
        return builder.build();
    }

    private ClientMaster.CloseRequestC2M createMasterCloseRequest() {
        ClientMaster.CloseRequestC2M.Builder builder = ClientMaster.CloseRequestC2M.newBuilder();
        builder.setClientId(this.consumerId);
        builder.setGroupName(this.consumerConfig.getConsumerGroup());
        ClientMaster.MasterCertificateInfo authInfo = this.genMasterCertificateInfo(true);
        if (authInfo != null) {
            builder.setAuthInfo(authInfo);
        }
        return builder.build();
    }

    private ClientBroker.RegisterRequestC2B createBrokerRegisterRequest(Partition partition) {
        Long currOffset;
        ClientBroker.RegisterRequestC2B.Builder builder = ClientBroker.RegisterRequestC2B.newBuilder();
        builder.setClientId(this.consumerId);
        builder.setGroupName(this.consumerConfig.getConsumerGroup());
        builder.setOpType(31);
        builder.setTopicName(partition.getTopic());
        builder.setPartitionId(partition.getPartitionId());
        builder.setQryPriorityId(this.rmtDataCache.getQryPriorityId());
        builder.setReadStatus(this.getGroupInitReadStatus(this.rmtDataCache.bookPartition(partition.getPartitionKey())));
        TopicProcessor topicProcessor = this.consumeSubInfo.getTopicProcessor(partition.getTopic());
        if (topicProcessor != null && topicProcessor.getFilterConds() != null) {
            builder.addAllFilterCondStr(topicProcessor.getFilterConds());
        }
        if (this.isFirst.get() && this.consumeSubInfo.isRequireBound() && this.consumeSubInfo.getIsNotAllocated() && (currOffset = this.consumeSubInfo.getAssignedPartOffset(partition.getPartitionKey())) != null && currOffset >= 0L) {
            builder.setCurrOffset(currOffset.longValue());
        }
        builder.setAuthInfo(this.genBrokerAuthenticInfo(partition.getBrokerId(), false));
        return builder.build();
    }

    private ClientBroker.RegisterRequestC2B createBrokerUnregisterRequest(Partition partition, boolean isLastConsumered) {
        ClientBroker.RegisterRequestC2B.Builder builder = ClientBroker.RegisterRequestC2B.newBuilder();
        builder.setClientId(this.consumerId);
        builder.setGroupName(this.consumerConfig.getConsumerGroup());
        builder.setOpType(32);
        builder.setTopicName(partition.getTopic());
        builder.setPartitionId(partition.getPartitionId());
        if (isLastConsumered) {
            builder.setReadStatus(0);
        } else {
            builder.setReadStatus(1);
        }
        builder.setAuthInfo(this.genBrokerAuthenticInfo(partition.getBrokerId(), true));
        return builder.build();
    }

    private ClientBroker.HeartBeatRequestC2B createBrokerHeartBeatRequest(int brokerId, List<String> partitionList) {
        ClientBroker.HeartBeatRequestC2B.Builder builder = ClientBroker.HeartBeatRequestC2B.newBuilder();
        builder.setClientId(this.consumerId);
        builder.setGroupName(this.consumerConfig.getConsumerGroup());
        builder.setReadStatus(this.getGroupInitReadStatus(false));
        builder.setQryPriorityId(this.rmtDataCache.getQryPriorityId());
        builder.addAllPartitionInfo(partitionList);
        builder.setAuthInfo(this.genBrokerAuthenticInfo(brokerId, false));
        return builder.build();
    }

    private void processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response, StringBuilder strBuffer) {
        if (response.hasNotAllocated() && !response.getNotAllocated()) {
            this.consumeSubInfo.compareAndSetIsNotAllocated(true, false);
        }
        this.rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer);
    }

    private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2C response) {
        if (response.hasAuthorizedInfo()) {
            this.processAuthorizedToken(response.getAuthorizedInfo());
        }
    }

    private void procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response, StringBuilder strBuffer) {
        if (response.hasNotAllocated() && !response.getNotAllocated()) {
            this.consumeSubInfo.compareAndSetIsNotAllocated(true, false);
        }
        this.rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer);
    }

    private ClientMaster.MasterCertificateInfo genMasterCertificateInfo(boolean force) {
        ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = null;
        if (this.consumerConfig.isEnableUserAuthentic()) {
            authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
            if (this.rmtDataCache.markAndGetAuthStatus(force)) {
                authInfoBuilder.setAuthInfo(this.authenticateHandler.genMasterAuthenticateToken(this.consumerConfig.getUsrName(), this.consumerConfig.getUsrPassWord()));
            } else {
                authInfoBuilder.setAuthorizedToken(this.authAuthorizedTokenRef.get());
            }
        }
        if (authInfoBuilder != null) {
            return authInfoBuilder.build();
        }
        return null;
    }

    private ClientBroker.AuthorizedInfo genBrokerAuthenticInfo(int brokerId, boolean force) {
        ClientBroker.AuthorizedInfo.Builder authInfoBuilder = ClientBroker.AuthorizedInfo.newBuilder();
        authInfoBuilder.setVisitAuthorizedToken(this.visitToken.get());
        if (this.consumerConfig.isEnableUserAuthentic() && this.rmtDataCache.markAndGetBrokerAuthStatus(brokerId, force)) {
            authInfoBuilder.setAuthAuthorizedToken(this.authenticateHandler.genBrokerAuthenticateToken(this.consumerConfig.getUsrName(), this.consumerConfig.getUsrPassWord()));
        }
        return authInfoBuilder.build();
    }

    private void processHeartBeatAuthorizedToken(ClientMaster.HeartResponseM2C response) {
        if (response.hasAuthorizedInfo()) {
            this.processAuthorizedToken(response.getAuthorizedInfo());
        }
    }

    private void processAuthorizedToken(ClientMaster.MasterAuthorizedInfo inAuthorizedTokenInfo) {
        if (inAuthorizedTokenInfo != null) {
            String curAuthAuthorizedToken;
            String inAuthAuthorizedToken;
            this.visitToken.set(inAuthorizedTokenInfo.getVisitAuthorizedToken());
            if (inAuthorizedTokenInfo.hasAuthAuthorizedToken() && TStringUtils.isNotBlank((String)(inAuthAuthorizedToken = inAuthorizedTokenInfo.getAuthAuthorizedToken())) && !inAuthAuthorizedToken.equals(curAuthAuthorizedToken = this.authAuthorizedTokenRef.get())) {
                this.authAuthorizedTokenRef.set(inAuthAuthorizedToken);
            }
        }
    }

    private int getGroupInitReadStatus(boolean isFistReg) {
        int readStatus = 0;
        switch (this.consumerConfig.getConsumePosition()) {
            case CONSUMER_FROM_LATEST_OFFSET: {
                if (!isFistReg) break;
                readStatus = 1;
                logger.info("[Consume From Max Offset]" + this.consumerId);
                break;
            }
            case CONSUMER_FROM_MAX_OFFSET_ALWAYS: {
                if (!isFistReg) break;
                readStatus = 2;
                logger.info("[Consume From Max Offset Always]" + this.consumerId);
                break;
            }
            default: {
                readStatus = 0;
            }
        }
        return readStatus;
    }

    protected FetchContext fetchMessage(PartitionSelectResult partSelectResult, StringBuilder strBuffer) {
        FetchContext taskContext = new FetchContext(partSelectResult);
        Partition partition = taskContext.getPartition();
        String topic = partition.getTopic();
        String partitionKey = partition.getPartitionKey();
        long startTime = System.currentTimeMillis();
        ClientBroker.GetMessageResponseB2C msgRspB2C = null;
        try {
            msgRspB2C = this.getBrokerService(partition.getBroker()).getMessagesC2B(this.createBrokerGetMessageRequest(partition, taskContext.isLastConsumed()), AddressUtils.getLocalAddress(), this.consumerConfig.isTlsEnable());
        }
        catch (Throwable ee) {
            this.clientStatsInfo.bookFailRpcCall(599);
            this.rmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
            taskContext.setFailProcessResult(400, strBuffer.append("Get message error, reason is ").append(ee.toString()).toString());
            strBuffer.delete(0, strBuffer.length());
            return taskContext;
        }
        long dltTime = System.currentTimeMillis() - startTime;
        if (msgRspB2C == null) {
            this.clientStatsInfo.bookFailRpcCall(500);
            this.rmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
            taskContext.setFailProcessResult(500, "Get message null");
            return taskContext;
        }
        try {
            switch (msgRspB2C.getErrCode()) {
                case 200: {
                    int msgSize = 0;
                    int msgCount = 0;
                    List tmpMessageList = DataConverterUtil.convertMessage((String)topic, (List)msgRspB2C.getMessagesList());
                    boolean isEscLimit = msgRspB2C.hasEscFlowCtrl() && msgRspB2C.getEscFlowCtrl();
                    boolean needFilter = false;
                    Set<String> topicFilterSet = null;
                    TopicProcessor topicProcessor = this.consumeSubInfo.getTopicProcessor(topic);
                    if (topicProcessor != null && (topicFilterSet = topicProcessor.getFilterConds()) != null && !topicFilterSet.isEmpty()) {
                        needFilter = true;
                    }
                    ArrayList<Message> messageList = new ArrayList<Message>();
                    for (Message message : tmpMessageList) {
                        if (message == null || needFilter && (TStringUtils.isBlank((String)message.getMsgType()) || !topicFilterSet.contains(message.getMsgType()))) continue;
                        ++msgCount;
                        messageList.add(message);
                        msgSize += message.getData().length;
                    }
                    long dataDltVal = msgRspB2C.hasCurrDataDlt() ? msgRspB2C.getCurrDataDlt() : -1L;
                    long currOffset = msgRspB2C.hasCurrOffset() ? msgRspB2C.getCurrOffset() : -2L;
                    long maxOffset = msgRspB2C.hasMaxOffset() ? msgRspB2C.getMaxOffset() : -2L;
                    boolean isRequireSlow = msgRspB2C.hasRequireSlow() && msgRspB2C.getRequireSlow();
                    this.rmtDataCache.setPartitionContextInfo(partitionKey, currOffset, 1, msgRspB2C.getErrCode(), isEscLimit, msgSize, 0L, dataDltVal, isRequireSlow, maxOffset);
                    taskContext.setSuccessProcessResult(currOffset, strBuffer.append(partitionKey).append(":").append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
                    strBuffer.delete(0, strBuffer.length());
                    this.clientStatsInfo.bookSuccGetMsg(dltTime, topic, partitionKey, msgCount, msgSize);
                    break;
                }
                case 411: 
                case 412: 
                case 415: {
                    this.removePartition(partition);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
                case 452: {
                    long defDltTime = msgRspB2C.hasMinLimitTime() ? (long)msgRspB2C.getMinLimitTime() : this.consumerConfig.getMsgNotFoundWaitPeriodMs();
                    this.rmtDataCache.errRspRelease(partitionKey, topic, taskContext.getUsedToken(), false, -2L, 0, msgRspB2C.getErrCode(), false, 0, defDltTime, this.isFilterConsume(topic), -2L, -2L);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
                default: {
                    long limitDlt = 300L;
                    switch (msgRspB2C.getErrCode()) {
                        case 403: {
                            limitDlt = 2000L;
                            break;
                        }
                        case 503: {
                            limitDlt = 300L;
                            break;
                        }
                        case 301: {
                            limitDlt = 200L;
                            break;
                        }
                        case 404: {
                            limitDlt = this.consumerConfig.getMsgNotFoundWaitPeriodMs();
                            break;
                        }
                    }
                    this.rmtDataCache.errRspRelease(partitionKey, topic, taskContext.getUsedToken(), false, -2L, 0, msgRspB2C.getErrCode(), false, 0, limitDlt, this.isFilterConsume(topic), -1L, -2L);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
            }
            if (msgRspB2C.getErrCode() != 200) {
                this.clientStatsInfo.bookFailRpcCall(msgRspB2C.getErrCode());
            }
            return taskContext;
        }
        catch (Throwable ee) {
            this.clientStatsInfo.bookFailRpcCall(500);
            logger.error("Process response code error", ee);
            this.rmtDataCache.succRspRelease(partitionKey, topic, taskContext.getUsedToken(), false, this.isFilterConsume(topic), -2L, -2L);
            taskContext.setFailProcessResult(500, strBuffer.append("Get message failed,topic=").append(topic).append(",partition=").append(partition).append(", throw info is ").append(ee.toString()).toString());
            strBuffer.delete(0, strBuffer.length());
            return taskContext;
        }
    }

    protected void checkClientRunning() throws TubeClientException {
        if (this.isShutdown()) {
            throw new TubeClientException("Status error: consumer has been shutdown");
        }
    }

    public void notifyAllMessageListenerStopped() {
        this.consumeSubInfo.notifyAllMessageListenerStopped();
    }

    protected boolean flushLastRequest(Partition partition) {
        boolean needReConsume = true;
        try {
            ClientBroker.CommitOffsetResponseB2C commitResponse = this.getBrokerService(partition.getBroker()).consumerCommitC2B(this.createBrokerCommitRequest(partition, true), AddressUtils.getLocalAddress(), this.consumerConfig.isTlsEnable());
            if (commitResponse != null && commitResponse.getSuccess()) {
                needReConsume = false;
            }
        }
        catch (Throwable e) {
            logger.error(new StringBuilder(256).append("flushLastRequest, commit ").append(partition.getTopic()).append("#").append(partition.getPartitionId()).append(" offset failed.").toString(), e);
        }
        return needReConsume;
    }

    protected boolean isSubscribed() {
        return this.subStatus.get() > 0;
    }

    protected BrokerReadService getBrokerService(BrokerInfo brokerInfo) {
        return (BrokerReadService)this.rpcServiceFactory.getService(BrokerReadService.class, brokerInfo, this.rpcConfig);
    }

    private class HeartTask2MasterWorker
    implements Runnable {
        private HeartTask2MasterWorker() {
        }

        @Override
        public void run() {
            StringBuilder strBuffer = new StringBuilder(256);
            try {
                long currentTime;
                ClientMaster.EventProto eventProto;
                ClientMaster.HeartResponseM2C response;
                BaseMessageConsumer.this.rmtDataCache.resumeTimeoutConsumePartitions(BaseMessageConsumer.this.isPullConsume, BaseMessageConsumer.this.consumerConfig.getPullProtectConfirmTimeoutMs());
                BaseMessageConsumer.this.clientStatsInfo.selfPrintStatsInfo(false, true, strBuffer);
                ConsumerEvent event = (ConsumerEvent)BaseMessageConsumer.this.rebalanceResults.poll();
                List<SubscribeInfo> subInfoList = null;
                boolean reportSubscribeInfo = false;
                if (event != null || ++BaseMessageConsumer.this.reportIntervalTimes >= BaseMessageConsumer.this.consumerConfig.getMaxSubInfoReportIntvlTimes()) {
                    subInfoList = BaseMessageConsumer.this.rmtDataCache.getSubscribeInfoList(BaseMessageConsumer.this.consumerId, BaseMessageConsumer.this.consumerConfig.getConsumerGroup());
                    reportSubscribeInfo = true;
                    BaseMessageConsumer.this.reportIntervalTimes = 0;
                }
                if ((response = BaseMessageConsumer.this.masterService.consumerHeartbeatC2M(BaseMessageConsumer.this.createMasterHeartbeatRequest(event, subInfoList, reportSubscribeInfo), AddressUtils.getLocalAddress(), BaseMessageConsumer.this.consumerConfig.isTlsEnable())) == null) {
                    BaseMessageConsumer.this.clientStatsInfo.bookHB2MasterTimeout();
                    logger.error(strBuffer.append("[Heartbeat Failed] ").append("return result is null!").toString());
                    BaseMessageConsumer.this.heartbeatRetryTimes++;
                    return;
                }
                if (!response.getSuccess()) {
                    if (response.getErrCode() == 411) {
                        BaseMessageConsumer.this.clientStatsInfo.bookHB2MasterTimeout();
                        try {
                            ClientMaster.RegisterResponseM2C regResponse = BaseMessageConsumer.this.masterService.consumerRegisterC2M(BaseMessageConsumer.this.createMasterRegisterRequest(), AddressUtils.getLocalAddress(), BaseMessageConsumer.this.consumerConfig.isTlsEnable());
                            if (regResponse == null || !regResponse.getSuccess()) {
                                if (regResponse == null) {
                                    logger.error(strBuffer.append("[Re-Register Failed] ").append(BaseMessageConsumer.this.consumerId).append(" register to master return null!").toString());
                                } else if (response.getErrCode() == 450) {
                                    logger.error(strBuffer.append("[Re-Register Failed] ").append(BaseMessageConsumer.this.consumerId).append(" ConsumeGroup forbidden, ").append(response.getErrMsg()).toString());
                                } else {
                                    logger.error(strBuffer.append("[Re-Register Failed] ").append(BaseMessageConsumer.this.consumerId).append(" ").append(response.getErrMsg()).toString());
                                }
                                strBuffer.delete(0, strBuffer.length());
                            } else {
                                BaseMessageConsumer.this.processRegisterAllocAndRspFlowRules(regResponse, strBuffer);
                                BaseMessageConsumer.this.processRegAuthorizedToken(regResponse);
                                logger.info(strBuffer.append("[Re-register] ").append(BaseMessageConsumer.this.consumerId).toString());
                                strBuffer.delete(0, strBuffer.length());
                            }
                        }
                        catch (Throwable e) {
                            strBuffer.delete(0, strBuffer.length());
                            logger.error(strBuffer.append("Register to master failed.").append(e.getCause()).toString());
                            ThreadUtils.sleep((long)1000L);
                        }
                        return;
                    }
                    BaseMessageConsumer.this.clientStatsInfo.bookHB2MasterException();
                    logger.error(strBuffer.append("[Heartbeat Failed] ").append(response.getErrMsg()).toString());
                    if (response.getErrCode() == 415) {
                        this.adjustHeartBeatPeriod("certificate failure", strBuffer);
                    } else {
                        BaseMessageConsumer.this.heartbeatRetryTimes++;
                    }
                    return;
                }
                BaseMessageConsumer.this.heartbeatRetryTimes = 0;
                BaseMessageConsumer.this.procHeartBeatRspAllocAndFlowRules(response, strBuffer);
                BaseMessageConsumer.this.processHeartBeatAuthorizedToken(response);
                if (response.hasRequireAuth()) {
                    BaseMessageConsumer.this.rmtDataCache.storeMasterAuthRequire(response.getRequireAuth());
                }
                if ((eventProto = response.getEvent()) != null && eventProto.getRebalanceId() > 0L) {
                    ConsumerEvent newEvent = new ConsumerEvent(eventProto.getRebalanceId(), EventType.valueOf((int)eventProto.getOpType()), DataConverterUtil.convertSubInfo((List)eventProto.getSubscribeInfoList()), EventStatus.TODO);
                    BaseMessageConsumer.this.rebalanceEvents.put(newEvent);
                    if (logger.isDebugEnabled()) {
                        strBuffer.append("[Receive Consumer Event]");
                        logger.debug(newEvent.toStrBuilder(BaseMessageConsumer.this.consumerId, strBuffer).toString());
                        strBuffer.delete(0, strBuffer.length());
                    }
                }
                if ((currentTime = System.currentTimeMillis()) - BaseMessageConsumer.this.lastHeartbeatTime2Master > BaseMessageConsumer.this.consumerConfig.getHeartbeatPeriodMs() * 2L) {
                    logger.warn(strBuffer.append(BaseMessageConsumer.this.consumerId).append(" heartbeat interval to master is too long,please check! Total time : ").append(currentTime - BaseMessageConsumer.this.lastHeartbeatTime2Master).toString());
                    strBuffer.delete(0, strBuffer.length());
                }
                BaseMessageConsumer.this.lastHeartbeatTime2Master = currentTime;
            }
            catch (InterruptedException ee) {
                logger.info("To Master Heartbeat thread is interrupted,existed!");
            }
            catch (Throwable e) {
                if (!BaseMessageConsumer.this.isShutdown()) {
                    logger.error("Heartbeat failed,retry later.", e);
                }
                this.adjustHeartBeatPeriod("heartbeat exception", strBuffer);
            }
        }

        private void adjustHeartBeatPeriod(String reason, StringBuilder sBuilder) {
            BaseMessageConsumer.this.lastHeartbeatTime2Master = System.currentTimeMillis();
            BaseMessageConsumer.this.heartbeatRetryTimes++;
            if (!BaseMessageConsumer.this.isShutdown() && BaseMessageConsumer.this.heartbeatRetryTimes > BaseMessageConsumer.this.consumerConfig.getMaxHeartBeatRetryTimes()) {
                logger.warn(sBuilder.append("Adjust HeartbeatPeriod for ").append(reason).append(", sleep ").append(BaseMessageConsumer.this.consumerConfig.getHeartbeatPeriodAfterFail()).append(" Ms").toString());
                sBuilder.delete(0, sBuilder.length());
                ThreadUtils.sleep((long)BaseMessageConsumer.this.consumerConfig.getHeartbeatPeriodAfterFail());
            }
        }
    }

    private class HeartTask2BrokerWorker
    implements Runnable {
        private HeartTask2BrokerWorker() {
        }

        @Override
        public void run() {
            StringBuilder strBuffer = new StringBuilder(256);
            while (!BaseMessageConsumer.this.isShutdown()) {
                try {
                    long currentTime = System.currentTimeMillis();
                    if (currentTime - BaseMessageConsumer.this.lastHeartbeatTime2Broker > BaseMessageConsumer.this.consumerConfig.getHeartbeatPeriodMs() * 2L) {
                        logger.warn(strBuffer.append(BaseMessageConsumer.this.consumerId).append(" heartbeat interval to broker is too long,please check! Total time : ").append(currentTime - BaseMessageConsumer.this.lastHeartbeatTime2Broker).toString());
                        strBuffer.delete(0, strBuffer.length());
                    }
                    for (BrokerInfo brokerInfo : BaseMessageConsumer.this.rmtDataCache.getAllRegisterBrokers()) {
                        ArrayList<String> partStrSet = new ArrayList<String>();
                        try {
                            List<Partition> partitions = BaseMessageConsumer.this.rmtDataCache.getBrokerPartitionList(brokerInfo);
                            if (partitions == null || partitions.isEmpty()) continue;
                            for (Partition partition : partitions) {
                                partStrSet.add(partition.toString());
                            }
                            ClientBroker.HeartBeatResponseB2C heartBeatResponseV2 = BaseMessageConsumer.this.getBrokerService(brokerInfo).consumerHeartbeatC2B(BaseMessageConsumer.this.createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet), AddressUtils.getLocalAddress(), BaseMessageConsumer.this.consumerConfig.isTlsEnable());
                            if (heartBeatResponseV2 == null) {
                                BaseMessageConsumer.this.clientStatsInfo.bookHB2BrokerTimeout();
                                continue;
                            }
                            if (heartBeatResponseV2.getSuccess()) {
                                BaseMessageConsumer.this.rmtDataCache.bookBrokerRequireAuthInfo(brokerInfo.getBrokerId(), heartBeatResponseV2);
                                if (!heartBeatResponseV2.getHasPartFailure()) continue;
                                try {
                                    ProtocolStringList strFailInfoList = heartBeatResponseV2.getFailureInfoList();
                                    for (String strFailInfo : strFailInfoList) {
                                        int index = strFailInfo.indexOf(":");
                                        if (index < 0) {
                                            logger.error(strBuffer.append("Parse Heartbeat response error : ").append("invalid response, ").append(strFailInfo).toString());
                                            strBuffer.delete(0, strBuffer.length());
                                            continue;
                                        }
                                        int errorCode = Integer.parseInt(strFailInfo.substring(0, index));
                                        Partition failPartition = new Partition(strFailInfo.substring(index + 1));
                                        BaseMessageConsumer.this.removePartition(failPartition);
                                        logger.warn(strBuffer.append("[heart2broker error] partition:").append(failPartition.toString()).append(", errorCode=").append(errorCode).toString());
                                        strBuffer.delete(0, strBuffer.length());
                                    }
                                    continue;
                                }
                                catch (Throwable ee) {
                                    if (BaseMessageConsumer.this.isShutdown()) continue;
                                    strBuffer.delete(0, strBuffer.length());
                                    logger.error(strBuffer.append("Parse Heartbeat response error :").append(ee.getMessage()).toString());
                                    strBuffer.delete(0, strBuffer.length());
                                    continue;
                                }
                            }
                            BaseMessageConsumer.this.clientStatsInfo.bookHB2BrokerException();
                            if (heartBeatResponseV2.getErrCode() != 415) continue;
                            for (Partition partition : partitions) {
                                BaseMessageConsumer.this.removePartition(partition);
                            }
                            logger.warn(strBuffer.append("[heart2broker error] certificate failure, ").append(brokerInfo.getBrokerStrInfo()).append("'s partitions area released, ").append(heartBeatResponseV2.getErrMsg()).toString());
                            strBuffer.delete(0, strBuffer.length());
                        }
                        catch (Throwable ee) {
                            if (BaseMessageConsumer.this.isShutdown()) continue;
                            BaseMessageConsumer.this.clientStatsInfo.bookHB2BrokerException();
                            BaseMessageConsumer.this.samplePrintCtrl.printExceptionCaught(ee);
                            if (partStrSet.isEmpty()) continue;
                            strBuffer.delete(0, strBuffer.length());
                            for (String partitionStr : partStrSet) {
                                Partition tmpPartition = new Partition(partitionStr);
                                BaseMessageConsumer.this.removePartition(tmpPartition);
                                logger.warn(strBuffer.append("[heart2broker Throwable] release partition:").append(partitionStr).toString());
                                strBuffer.delete(0, strBuffer.length());
                            }
                        }
                    }
                    BaseMessageConsumer.this.lastHeartbeatTime2Broker = System.currentTimeMillis();
                    Thread.sleep(BaseMessageConsumer.this.consumerConfig.getHeartbeatPeriodMs());
                }
                catch (Throwable e) {
                    BaseMessageConsumer.this.clientStatsInfo.bookHB2BrokerException();
                    BaseMessageConsumer.this.lastHeartbeatTime2Broker = System.currentTimeMillis();
                    if (BaseMessageConsumer.this.isShutdown()) continue;
                    logger.error("heartbeat thread error 3 : ", e);
                }
            }
        }
    }
}

