/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound;

import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.integration.outbound.RocketMQProduceFactory;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;

public class RocketMQProducerMessageHandler
extends AbstractMessageHandler
implements Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(RocketMQProducerMessageHandler.class);
    private volatile boolean running = false;
    private volatile boolean isTrans = false;
    private ErrorMessageStrategy errorMessageStrategy;
    private MessageChannel sendFailureChannel;
    private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;
    private DefaultMQProducer defaultMQProducer;
    private MessageQueueSelector messageQueueSelector;
    private final ProducerDestination destination;
    private final ExtendedProducerProperties<RocketMQProducerProperties> extendedProducerProperties;
    private final RocketMQProducerProperties mqProducerProperties;

    public RocketMQProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<RocketMQProducerProperties> extendedProducerProperties, RocketMQProducerProperties mqProducerProperties) {
        this.destination = destination;
        this.extendedProducerProperties = extendedProducerProperties;
        this.mqProducerProperties = mqProducerProperties;
    }

    protected void onInit() {
        if (null == this.mqProducerProperties || !this.mqProducerProperties.getEnabled()) {
            return;
        }
        super.onInit();
        this.defaultMQProducer = RocketMQProduceFactory.initRocketMQProducer(this.destination.getName(), this.mqProducerProperties);
        this.isTrans = this.defaultMQProducer instanceof TransactionMQProducer;
        this.messageQueueSelector = RocketMQBeanContainerCache.getBean(this.mqProducerProperties.getMessageQueueSelector(), MessageQueueSelector.class, this.extendedProducerProperties.isPartitioned() ? new PartitionMessageQueueSelector() : null);
    }

    public void start() {
        Instrumentation instrumentation = new Instrumentation(this.destination.getName(), this);
        try {
            this.defaultMQProducer.start();
            if (!this.isTrans && this.extendedProducerProperties.isPartitioned()) {
                List messageQueues = this.defaultMQProducer.fetchPublishMessageQueues(this.destination.getName());
                if (this.extendedProducerProperties.getPartitionCount() != messageQueues.size()) {
                    this.logger.info((Object)String.format("The partition count of topic '%s' will change from '%s' to '%s'", this.destination.getName(), this.extendedProducerProperties.getPartitionCount(), messageQueues.size()));
                    this.extendedProducerProperties.setPartitionCount(messageQueues.size());
                    this.partitioningInterceptor.setPartitionCount(this.extendedProducerProperties.getPartitionCount());
                }
            }
            this.running = true;
            instrumentation.markStartedSuccessfully();
        }
        catch (NullPointerException | MQClientException e) {
            instrumentation.markStartFailed((Exception)e);
            log.error("The defaultMQProducer startup failure !!!", e);
        }
        finally {
            InstrumentationManager.addHealthInstrumentation(instrumentation);
        }
    }

    public void stop() {
        if (this.running && null != this.defaultMQProducer) {
            this.defaultMQProducer.shutdown();
        }
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    protected void handleMessageInternal(org.springframework.messaging.Message<?> message) {
        try {
            SendResult sendResult;
            Message mqMessage = RocketMQMessageConverterSupport.convertMessage2MQ(this.destination.getName(), message);
            if (this.defaultMQProducer instanceof TransactionMQProducer) {
                TransactionListener transactionListener = RocketMQBeanContainerCache.getBean(this.mqProducerProperties.getTransactionListener(), TransactionListener.class);
                if (transactionListener == null) {
                    throw new MessagingException("TransactionMQProducer must have a TransactionListener !!! ");
                }
                ((TransactionMQProducer)this.defaultMQProducer).setTransactionListener(transactionListener);
                if (log.isDebugEnabled()) {
                    log.debug("send transaction message ->{}", (Object)mqMessage);
                }
                sendResult = this.defaultMQProducer.sendMessageInTransaction(mqMessage, message.getHeaders().get((Object)"TRANSACTIONAL_ARGS"));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("send message ->{}", (Object)mqMessage);
                }
                sendResult = this.send(mqMessage, this.messageQueueSelector, message.getHeaders(), message);
            }
            log.info("the message has sent,message={},sendResult={}", (Object)mqMessage, (Object)sendResult);
            if (log.isDebugEnabled()) {
                log.debug("the message has sent,message={},sendResult={}", (Object)mqMessage, (Object)sendResult);
            }
            if (sendResult == null || !SendStatus.SEND_OK.equals((Object)sendResult.getSendStatus())) {
                log.error("message send fail.SendStatus is not OK.the message={}", (Object)mqMessage);
                this.doFail(message, (Throwable)new MessagingException("message send fail.SendStatus is not OK."));
            }
        }
        catch (Exception e) {
            log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage(), (Throwable)e);
            this.doFail(message, e);
        }
    }

    private SendResult send(Message mqMessage, MessageQueueSelector selector, Object args, org.springframework.messaging.Message<?> message) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        SendResult sendResult = new SendResult();
        sendResult.setSendStatus(SendStatus.SEND_OK);
        if (RocketMQProducerProperties.SendType.OneWay.equalsName(this.mqProducerProperties.getSendType())) {
            if (null != selector) {
                this.defaultMQProducer.sendOneway(mqMessage, selector, args);
            } else {
                this.defaultMQProducer.sendOneway(mqMessage);
            }
            return sendResult;
        }
        if (RocketMQProducerProperties.SendType.Sync.equalsName(this.mqProducerProperties.getSendType())) {
            if (null != selector) {
                return this.defaultMQProducer.send(mqMessage, selector, args);
            }
            return this.defaultMQProducer.send(mqMessage);
        }
        if (RocketMQProducerProperties.SendType.Async.equalsName(this.mqProducerProperties.getSendType())) {
            if (null != selector) {
                this.defaultMQProducer.send(mqMessage, selector, args, this.getSendCallback(message));
            } else {
                this.defaultMQProducer.send(mqMessage, this.getSendCallback(message));
            }
            return sendResult;
        }
        throw new MessagingException("message hasn't been sent,cause by : the SendType must be in this values[OneWay, Async, Sync]");
    }

    private SendCallback getSendCallback(final org.springframework.messaging.Message<?> message) {
        SendCallback sendCallback = RocketMQBeanContainerCache.getBean(this.mqProducerProperties.getSendCallBack(), SendCallback.class);
        if (null == sendCallback) {
            sendCallback = new SendCallback(){

                public void onSuccess(SendResult sendResult) {
                }

                public void onException(Throwable e) {
                    RocketMQProducerMessageHandler.this.doFail(message, e);
                }
            };
        }
        return sendCallback;
    }

    private void doFail(org.springframework.messaging.Message<?> message, Throwable e) {
        if (this.getSendFailureChannel() == null) {
            throw new MessagingException(message, e);
        }
        this.getSendFailureChannel().send((org.springframework.messaging.Message)this.getErrorMessageStrategy().buildErrorMessage(e, ErrorMessageUtils.getAttributeAccessor(message, message)));
    }

    public MessageChannel getSendFailureChannel() {
        return this.sendFailureChannel;
    }

    public void setSendFailureChannel(MessageChannel sendFailureChannel) {
        this.sendFailureChannel = sendFailureChannel;
    }

    public ErrorMessageStrategy getErrorMessageStrategy() {
        return this.errorMessageStrategy;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        this.errorMessageStrategy = errorMessageStrategy;
    }

    public MessageConverterConfigurer.PartitioningInterceptor getPartitioningInterceptor() {
        return this.partitioningInterceptor;
    }

    public RocketMQProducerMessageHandler setPartitioningInterceptor(MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) {
        this.partitioningInterceptor = partitioningInterceptor;
        return this;
    }
}

