/*
 * Decompiled with CFR 0.152.
 */
package com.taotao.cloud.mq.rocketmq.rocketmq.core;

import com.google.common.collect.Lists;
import com.taotao.cloud.common.utils.log.LogUtils;
import com.taotao.cloud.mq.common.Message;
import com.taotao.cloud.mq.common.MessageQueueConsumer;
import com.taotao.cloud.mq.common.MessageQueueListener;
import com.taotao.cloud.mq.common.MessageQueueProperties;
import com.taotao.cloud.mq.common.consumer.MessageQueueConsumerException;
import com.taotao.cloud.mq.rocketmq.rocketmq.env.FixedRocketMQConsumerProperties;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

public class RocketMQConsumer
implements InitializingBean,
DisposableBean {
    private static final String INITIALIZING_ROCKETMQ_CONSUMER = "Initializing RocketMQConsumer";
    private static final String DESTROY_ROCKETMQ_CONSUMER = "Destroy RocketMQConsumer";
    private static final String ROCKETMQ_CONSUMER_CONSUME_ERROR = "RocketMQConsumer consume error: {}";
    private static final String CREATE_DEFAULT_MQPUSH_CONSUMER_GROUP_NAMESPACE_TOPIC = "Create DefaultMQPushConsumer, group: {}, namespace: {}, topic: {}";
    private final List<DefaultMQPushConsumer> consumers = Lists.newArrayList();
    private final MessageQueueProperties messageQueueProperties;
    private final RocketMQProperties rocketMQProperties;
    private final FixedRocketMQConsumerProperties fixedRocketMQConsumerProperties;
    private final List<MessageQueueConsumer> messageQueueConsumers;

    public RocketMQConsumer(MessageQueueProperties messageQueueProperties, RocketMQProperties rocketMQProperties, FixedRocketMQConsumerProperties fixedRocketMQConsumerProperties, List<MessageQueueConsumer> messageQueueConsumers) {
        this.messageQueueProperties = messageQueueProperties;
        this.rocketMQProperties = rocketMQProperties;
        this.fixedRocketMQConsumerProperties = fixedRocketMQConsumerProperties;
        this.messageQueueConsumers = messageQueueConsumers;
    }

    public void afterPropertiesSet() throws Exception {
        LogUtils.debug((String)INITIALIZING_ROCKETMQ_CONSUMER, (Object[])new Object[0]);
        if (CollectionUtils.isEmpty(this.messageQueueConsumers)) {
            return;
        }
        try {
            for (MessageQueueConsumer messageQueueConsumer : this.messageQueueConsumers) {
                DefaultMQPushConsumer consumer = this.createConsumer(messageQueueConsumer);
                if (consumer == null) continue;
                this.consumers.add(consumer);
                consumer.registerMessageListener((messageExts, context) -> {
                    AtomicReference<ConsumeConcurrentlyStatus> status = new AtomicReference<ConsumeConcurrentlyStatus>(ConsumeConcurrentlyStatus.RECONSUME_LATER);
                    ArrayList messages = Lists.newArrayListWithCapacity((int)messageExts.size());
                    messageExts.forEach(messageExt -> {
                        Message message = new Message();
                        message.setTopic(messageExt.getTopic());
                        message.setPartition(Integer.valueOf(messageExt.getQueueId()));
                        message.setKey(messageExt.getKeys());
                        message.setTags(messageExt.getTags());
                        message.setDelayTimeLevel(Integer.valueOf(messageExt.getDelayTimeLevel()));
                        message.setBody(new String(messageExt.getBody()));
                        messages.add(message);
                    });
                    messageQueueConsumer.consume((List)messages, () -> {
                        if (status.get() != ConsumeConcurrentlyStatus.CONSUME_SUCCESS) {
                            status.set(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
                        }
                    });
                    return status.get();
                });
                consumer.start();
            }
        }
        catch (MQClientException e) {
            LogUtils.error((String)ROCKETMQ_CONSUMER_CONSUME_ERROR, (Object[])new Object[]{e.getMessage(), e});
            throw new MessageQueueConsumerException(e.getMessage());
        }
    }

    public void destroy() {
        LogUtils.debug((String)DESTROY_ROCKETMQ_CONSUMER, (Object[])new Object[0]);
        this.consumers.forEach(DefaultMQPushConsumer::shutdown);
    }

    private DefaultMQPushConsumer createConsumer(MessageQueueConsumer messageQueueConsumer) throws MQClientException {
        Class<?> clazz = messageQueueConsumer.getClass();
        MessageQueueListener annotation = clazz.getAnnotation(MessageQueueListener.class);
        if (StringUtils.isNotBlank((CharSequence)annotation.type()) && !this.messageQueueProperties.getType().equalsIgnoreCase(annotation.type())) {
            return null;
        }
        if (StringUtils.isBlank((CharSequence)annotation.type()) && !"ROCKETMQ".equalsIgnoreCase(this.messageQueueProperties.getType())) {
            return null;
        }
        String namespace = null;
        if (StringUtils.isNotBlank((CharSequence)this.fixedRocketMQConsumerProperties.getNamespace())) {
            namespace = this.fixedRocketMQConsumerProperties.getNamespace();
        }
        String topic = null;
        if (StringUtils.isNotBlank((CharSequence)annotation.topic())) {
            topic = annotation.topic();
        } else if (StringUtils.isNotBlank((CharSequence)this.fixedRocketMQConsumerProperties.getTopic())) {
            topic = this.fixedRocketMQConsumerProperties.getTopic();
        }
        Object group = null;
        if (StringUtils.isNotBlank((CharSequence)annotation.group())) {
            group = annotation.group();
        } else if (StringUtils.isNotBlank((CharSequence)this.fixedRocketMQConsumerProperties.getGroup())) {
            group = this.fixedRocketMQConsumerProperties.getGroup() + "_" + topic;
        }
        String selectorExpression = null;
        if (StringUtils.isNotBlank((CharSequence)annotation.selectorExpression())) {
            selectorExpression = annotation.selectorExpression();
        } else if (StringUtils.isNotBlank((CharSequence)this.fixedRocketMQConsumerProperties.getSelectorExpression())) {
            selectorExpression = this.fixedRocketMQConsumerProperties.getSelectorExpression();
        }
        AclClientRPCHook rpcHook = null;
        if (StringUtils.isNotBlank((CharSequence)this.fixedRocketMQConsumerProperties.getAccessKey()) && StringUtils.isNotBlank((CharSequence)this.fixedRocketMQConsumerProperties.getSecretKey())) {
            rpcHook = new AclClientRPCHook(new SessionCredentials(this.fixedRocketMQConsumerProperties.getAccessKey(), this.fixedRocketMQConsumerProperties.getSecretKey()));
        }
        int pullBatchSize = 32;
        if (annotation.pullBatchSize() > 0) {
            pullBatchSize = annotation.pullBatchSize();
        } else if (this.fixedRocketMQConsumerProperties.getPullBatchSize() > 0) {
            pullBatchSize = this.fixedRocketMQConsumerProperties.getPullBatchSize();
        }
        int consumeMessageBatchMaxSize = 1;
        if (annotation.consumeMessageBatchMaxSize() > 0) {
            consumeMessageBatchMaxSize = annotation.consumeMessageBatchMaxSize();
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(namespace, (String)group, (RPCHook)rpcHook);
        consumer.setNamesrvAddr(this.rocketMQProperties.getNameServer());
        consumer.subscribe(topic, selectorExpression);
        consumer.setPullBatchSize(pullBatchSize);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        consumer.setMessageModel(MessageModel.valueOf((String)this.fixedRocketMQConsumerProperties.getMessageModel()));
        LogUtils.debug((String)CREATE_DEFAULT_MQPUSH_CONSUMER_GROUP_NAMESPACE_TOPIC, (Object[])new Object[]{group, namespace, topic});
        return consumer;
    }
}

