/*
 * Decompiled with CFR 0.152.
 */
package io.github.rhwayfun.springboot.rocketmq.starter.common;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import io.github.rhwayfun.springboot.rocketmq.starter.constants.ConsumeMode;
import io.github.rhwayfun.springboot.rocketmq.starter.constants.RocketMqContent;
import io.github.rhwayfun.springboot.rocketmq.starter.constants.RocketMqTopic;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public abstract class AbstractRocketMqConsumer<Topic extends RocketMqTopic, Content extends RocketMqContent> {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    protected Class<Topic> topicClazz;
    protected Class<Content> contentClazz;
    private boolean isStarted;
    private Integer consumeThreadMin;
    private Integer consumeThreadMax;
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
    private int delayLevelWhenNextConsume = 0;
    private long suspendCurrentQueueTimeMillis = -1L;
    private MessageModel messageModel = MessageModel.CLUSTERING;
    private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY;
    private DefaultMQPushConsumer consumer;

    public abstract Map<String, Set<String>> subscribeTopicTags();

    public abstract String getConsumerGroup();

    public abstract boolean consumeMsg(Content var1, MessageExt var2);

    @PostConstruct
    public void init() throws MQClientException {
        Class<?> parentClazz = this.getClass();
        Type genType = parentClazz.getGenericSuperclass();
        Type[] types = ((ParameterizedType)genType).getActualTypeArguments();
        this.topicClazz = (Class)types[0];
        this.contentClazz = (Class)types[1];
        if (this.isStarted()) {
            throw new IllegalStateException("container already started. " + this.toString());
        }
        this.initRocketMQPushConsumer();
    }

    @PreDestroy
    public void destroy() throws Exception {
        if (Objects.nonNull(this.consumer)) {
            this.consumer.shutdown();
        }
        this.logger.info("consumer shutdown, {}", (Object)this.toString());
    }

    private void initRocketMQPushConsumer() throws MQClientException {
        Assert.notNull((Object)this.getConsumerGroup(), (String)"Property 'consumerGroup' is required");
        Assert.notEmpty(this.subscribeTopicTags(), (String)"subscribeTopicTags method can't be empty");
        this.consumer = new DefaultMQPushConsumer(this.getConsumerGroup());
        if (this.consumeThreadMax != null) {
            this.consumer.setConsumeThreadMax(this.consumeThreadMax.intValue());
        }
        if (this.consumeThreadMax != null && this.consumeThreadMax < this.consumer.getConsumeThreadMin()) {
            this.consumer.setConsumeThreadMin(this.consumeThreadMax.intValue());
        }
        this.consumer.setConsumeFromWhere(this.consumeFromWhere);
        this.consumer.setMessageModel(this.messageModel);
        switch (this.consumeMode) {
            case Orderly: {
                this.consumer.setMessageListener((MessageListener)new DefaultMessageListenerOrderly());
                break;
            }
            case CONCURRENTLY: {
                this.consumer.setMessageListener((MessageListener)new DefaultMessageListenerConcurrently());
                break;
            }
            default: {
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
            }
        }
    }

    private <T> T parseMsg(byte[] body, Class<? extends RocketMqContent> clazz) {
        Object t = null;
        if (body != null) {
            try {
                t = JSON.parseObject((byte[])body, clazz, (Feature[])new Feature[0]);
            }
            catch (Exception e) {
                this.logger.error("can not parse to object", (Throwable)e);
            }
        }
        return (T)t;
    }

    public Integer getConsumeThreadMin() {
        return this.consumeThreadMin;
    }

    public void setConsumeThreadMin(Integer consumeThreadMin) {
        this.consumeThreadMin = consumeThreadMin;
    }

    public Integer getConsumeThreadMax() {
        return this.consumeThreadMax;
    }

    public void setConsumeThreadMax(Integer consumeThreadMax) {
        this.consumeThreadMax = consumeThreadMax;
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    public void setStarted(boolean started) {
        this.isStarted = started;
    }

    public ConsumeFromWhere getConsumeFromWhere() {
        return this.consumeFromWhere;
    }

    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
        this.consumeFromWhere = consumeFromWhere;
    }

    public int getDelayLevelWhenNextConsume() {
        return this.delayLevelWhenNextConsume;
    }

    public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
        this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
    }

    public long getSuspendCurrentQueueTimeMillis() {
        return this.suspendCurrentQueueTimeMillis;
    }

    public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
    }

    public MessageModel getMessageModel() {
        return this.messageModel;
    }

    public void setMessageModel(MessageModel messageModel) {
        this.messageModel = messageModel;
    }

    public ConsumeMode getConsumeMode() {
        return this.consumeMode;
    }

    public void setConsumeMode(ConsumeMode consumeMode) {
        this.consumeMode = consumeMode;
    }

    public DefaultMQPushConsumer getConsumer() {
        return this.consumer;
    }

    public class DefaultMessageListenerOrderly
    implements MessageListenerOrderly {
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                try {
                    long now = System.currentTimeMillis();
                    AbstractRocketMqConsumer.this.consumeMsg((RocketMqContent)AbstractRocketMqConsumer.this.parseMsg(messageExt.getBody(), AbstractRocketMqConsumer.this.contentClazz), messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    AbstractRocketMqConsumer.this.logger.debug("consume {} cost: {} ms", (Object)messageExt.getMsgId(), (Object)costTime);
                }
                catch (Exception e) {
                    AbstractRocketMqConsumer.this.logger.warn("consume message failed. messageExt:{}", (Object)messageExt, (Object)e);
                    context.setSuspendCurrentQueueTimeMillis(AbstractRocketMqConsumer.this.suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    public class DefaultMessageListenerConcurrently
    implements MessageListenerConcurrently {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                try {
                    long now = System.currentTimeMillis();
                    AbstractRocketMqConsumer.this.consumeMsg((RocketMqContent)AbstractRocketMqConsumer.this.parseMsg(messageExt.getBody(), AbstractRocketMqConsumer.this.contentClazz), messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    AbstractRocketMqConsumer.this.logger.info("consume {} cost: {} ms", (Object)messageExt.getMsgId(), (Object)costTime);
                }
                catch (Exception e) {
                    AbstractRocketMqConsumer.this.logger.warn("consume message failed. messageExt:{}", (Object)messageExt, (Object)e);
                    context.setDelayLevelWhenNextConsume(AbstractRocketMqConsumer.this.delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
}

