/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.rocketmq.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ClassUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;

public class RocketMQInboundChannelAdapter
extends MessageProducerSupport {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class);
    private ConsumerInstrumentation consumerInstrumentation;
    private InstrumentationManager instrumentationManager;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<? extends Object> recoveryCallback;
    private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
    private final String destination;
    private final String group;
    private final ConsumersManager consumersManager;

    public RocketMQInboundChannelAdapter(ConsumersManager consumersManager, ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties, String destination, String group, InstrumentationManager instrumentationManager) {
        this.consumersManager = consumersManager;
        this.consumerProperties = consumerProperties;
        this.destination = destination;
        this.group = group;
        this.instrumentationManager = instrumentationManager;
    }

    protected void doStart() {
        CloudStreamMessageListener listener;
        if (this.consumerProperties == null || !((RocketMQConsumerProperties)this.consumerProperties.getExtension()).getEnabled().booleanValue()) {
            return;
        }
        String tags = ((RocketMQConsumerProperties)this.consumerProperties.getExtension()).getTags();
        Boolean isOrderly = ((RocketMQConsumerProperties)this.consumerProperties.getExtension()).getOrderly();
        DefaultMQPushConsumer consumer = this.consumersManager.getOrCreateConsumer(this.group, this.destination, this.consumerProperties);
        CloudStreamMessageListener cloudStreamMessageListener = listener = isOrderly != false ? new CloudStreamMessageListenerOrderly() : new CloudStreamMessageListenerConcurrently();
        if (this.retryTemplate != null) {
            this.retryTemplate.registerListener((RetryListener)listener);
        }
        HashSet tagsSet = tags == null ? new HashSet() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect(Collectors.toSet());
        Optional.ofNullable(this.instrumentationManager).ifPresent(manager -> {
            this.consumerInstrumentation = manager.getConsumerInstrumentation(this.destination);
            manager.addHealthInstrumentation(this.consumerInstrumentation);
        });
        try {
            if (!StringUtils.isEmpty((Object)((RocketMQConsumerProperties)this.consumerProperties.getExtension()).getSql())) {
                consumer.subscribe(this.destination, MessageSelector.bySql((String)((RocketMQConsumerProperties)this.consumerProperties.getExtension()).getSql()));
            } else {
                consumer.subscribe(this.destination, String.join((CharSequence)" || ", tagsSet));
            }
            Optional.ofNullable(this.consumerInstrumentation).ifPresent(c -> c.markStartedSuccessfully());
        }
        catch (MQClientException e) {
            Optional.ofNullable(this.consumerInstrumentation).ifPresent(c -> c.markStartFailed((Exception)((Object)e)));
            logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), (Throwable)e);
            throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
        }
        consumer.registerMessageListener((MessageListener)listener);
        try {
            this.consumersManager.startConsumer(this.group);
        }
        catch (MQClientException e) {
            logger.error("RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), (Throwable)e);
            throw new RuntimeException("RocketMQ Consumer startup failed.", e);
        }
    }

    protected void doStop() {
        this.consumersManager.stopConsumer(this.group);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    protected class CloudStreamMessageListenerOrderly
    extends CloudStreamMessageListener
    implements MessageListenerOrderly {
        protected CloudStreamMessageListenerOrderly() {
        }

        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            Acknowledgement acknowledgement = this.consumeMessage(msgs);
            context.setSuspendCurrentQueueTimeMillis(acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill().longValue());
            return acknowledgement.getConsumeOrderlyStatus();
        }
    }

    protected class CloudStreamMessageListenerConcurrently
    extends CloudStreamMessageListener
    implements MessageListenerConcurrently {
        protected CloudStreamMessageListenerConcurrently() {
        }

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            Acknowledgement acknowledgement = this.consumeMessage(msgs);
            context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel().intValue());
            return acknowledgement.getConsumeConcurrentlyStatus();
        }
    }

    protected class CloudStreamMessageListener
    implements MessageListener,
    RetryListener {
        protected CloudStreamMessageListener() {
        }

        Acknowledgement consumeMessage(List<MessageExt> msgs) {
            boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
            try {
                if (enableRetry) {
                    return (Acknowledgement)RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> this.doSendMsgs(msgs, context), (RecoveryCallback)new RecoveryCallback<Acknowledgement>(){

                        public Acknowledgement recover(RetryContext context) throws Exception {
                            RocketMQInboundChannelAdapter.this.recoveryCallback.recover(context);
                            if (ClassUtils.isAssignable(this.getClass(), MessageListenerConcurrently.class)) {
                                return Acknowledgement.buildConcurrentlyInstance();
                            }
                            return Acknowledgement.buildOrderlyInstance();
                        }
                    });
                }
                Acknowledgement result = this.doSendMsgs(msgs, null);
                Optional.ofNullable(RocketMQInboundChannelAdapter.this.instrumentationManager).ifPresent(manager -> manager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination).markConsumed());
                return result;
            }
            catch (Exception e) {
                logger.error("RocketMQ Message hasn't been processed successfully. Caused by ", (Throwable)e);
                Optional.ofNullable(RocketMQInboundChannelAdapter.this.instrumentationManager).ifPresent(manager -> manager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination).markConsumedFailure());
                throw new RuntimeException("RocketMQ Message hasn't been processed successfully. Caused by ", e);
            }
        }

        private Acknowledgement doSendMsgs(List<MessageExt> msgs, RetryContext context) {
            ArrayList acknowledgements = new ArrayList();
            msgs.forEach(msg -> {
                String retryInfo = context == null ? "" : "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
                logger.debug(retryInfo + "consuming msg:\n" + msg);
                logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
                Acknowledgement acknowledgement = new Acknowledgement();
                Message toChannel = MessageBuilder.withPayload((Object)msg.getBody()).setHeaders((MessageHeaderAccessor)new RocketMQMessageHeaderAccessor().withAcknowledgment(acknowledgement).withTags(msg.getTags()).withKeys(msg.getKeys()).withFlag(msg.getFlag()).withRocketMessage((MessageExt)msg)).build();
                acknowledgements.add(acknowledgement);
                RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
            });
            return (Acknowledgement)acknowledgements.get(0);
        }

        public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
            return true;
        }

        public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            if (throwable != null) {
                Optional.ofNullable(RocketMQInboundChannelAdapter.this.instrumentationManager).ifPresent(manager -> manager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination).markConsumedFailure());
            } else {
                Optional.ofNullable(RocketMQInboundChannelAdapter.this.instrumentationManager).ifPresent(manager -> manager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination).markConsumed());
            }
        }

        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        }
    }
}

