/*
 * Decompiled with CFR 0.152.
 */
package net.dreamlu.iot.mqtt.core.client;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import net.dreamlu.iot.mqtt.codec.MqttConnAckMessage;
import net.dreamlu.iot.mqtt.codec.MqttConnAckVariableHeader;
import net.dreamlu.iot.mqtt.codec.MqttConnectReasonCode;
import net.dreamlu.iot.mqtt.codec.MqttFixedHeader;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttMessageIdVariableHeader;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.codec.MqttPubAckMessage;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttPublishVariableHeader;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubAckMessage;
import net.dreamlu.iot.mqtt.codec.MqttSubAckPayload;
import net.dreamlu.iot.mqtt.codec.MqttSubscribeMessage;
import net.dreamlu.iot.mqtt.codec.MqttUnsubAckMessage;
import net.dreamlu.iot.mqtt.core.client.IMqttClientConnectListener;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageIdGenerator;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.core.client.IMqttClientProcessor;
import net.dreamlu.iot.mqtt.core.client.IMqttClientSession;
import net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
import net.dreamlu.iot.mqtt.core.client.MqttClientSubscription;
import net.dreamlu.iot.mqtt.core.client.MqttPendingSubscription;
import net.dreamlu.iot.mqtt.core.client.MqttPendingUnSubscription;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.util.CollUtil;
import net.dreamlu.iot.mqtt.core.util.timer.AckService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;

public class DefaultMqttClientProcessor
implements IMqttClientProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
    private final int reSubscribeBatchSize;
    private final IMqttClientSession clientSession;
    private final IMqttClientConnectListener connectListener;
    private final IMqttClientMessageIdGenerator messageIdGenerator;
    private final AckService ackService;
    private final ThreadPoolExecutor executor;

    public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) {
        this.reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize();
        this.clientSession = mqttClientCreator.getClientSession();
        this.connectListener = mqttClientCreator.getConnectListener();
        this.messageIdGenerator = mqttClientCreator.getMessageIdGenerator();
        this.ackService = mqttClientCreator.getAckService();
        this.executor = mqttClientCreator.getMqttExecutor();
    }

    @Override
    public void processDecodeFailure(ChannelContext context, MqttMessage message, Throwable ex) {
        logger.error(ex.getMessage(), ex);
    }

    @Override
    public void processConAck(ChannelContext context, MqttConnAckMessage message) {
        MqttConnAckVariableHeader connAckVariableHeader = message.variableHeader();
        MqttConnectReasonCode returnCode = connAckVariableHeader.connectReturnCode();
        switch (returnCode) {
            case CONNECTION_ACCEPTED: {
                if (logger.isInfoEnabled()) {
                    Node node = context.getServerNode();
                    logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", new Object[]{context.getId(), node.getIp(), node.getPort()});
                }
                this.publishConnectEvent(context);
                if (connAckVariableHeader.isSessionPresent()) break;
                this.reSendSubscription(context);
                break;
            }
            default: {
                String remark = "MqttClient connect error error ReturnCode:" + returnCode;
                Tio.close((ChannelContext)context, (String)remark);
            }
        }
    }

    private void publishConnectEvent(ChannelContext context) {
        if (this.connectListener == null) {
            return;
        }
        this.executor.submit(() -> {
            try {
                this.connectListener.onConnected(context, context.isReconnect);
            }
            catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
        });
    }

    private void reSendSubscription(ChannelContext context) {
        List<MqttClientSubscription> reSubscriptionList = this.clientSession.getAndCleanSubscription();
        if (reSubscriptionList.isEmpty()) {
            return;
        }
        int subscribedSize = reSubscriptionList.size();
        if (subscribedSize <= this.reSubscribeBatchSize) {
            this.reSendSubscription(context, reSubscriptionList);
        } else {
            List<List<MqttClientSubscription>> partitionList = CollUtil.partition(reSubscriptionList, this.reSubscribeBatchSize);
            for (List<MqttClientSubscription> partition : partitionList) {
                this.reSendSubscription(context, partition);
            }
        }
    }

    private void reSendSubscription(ChannelContext context, List<MqttClientSubscription> reSubscriptionList) {
        List topicSubscriptionList = reSubscriptionList.stream().map(MqttClientSubscription::toTopicSubscription).collect(Collectors.toList());
        int messageId = this.messageIdGenerator.getId();
        MqttSubscribeMessage message = MqttMessageBuilders.subscribe().addSubscriptions(topicSubscriptionList).messageId(messageId).build();
        MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(reSubscriptionList, message);
        Boolean result = Tio.send((ChannelContext)context, (Packet)message);
        logger.info("MQTT subscriptionList:{} messageId:{} resubscribing result:{}", new Object[]{reSubscriptionList, messageId, result});
        pendingSubscription.startRetransmitTimer(this.ackService, msg -> Tio.send((ChannelContext)context, (Packet)msg));
        this.clientSession.addPaddingSubscribe(messageId, pendingSubscription);
    }

    @Override
    public void processSubAck(MqttSubAckMessage message) {
        int messageId = message.variableHeader().messageId();
        logger.debug("MqttClient SubAck messageId:{}", (Object)messageId);
        MqttPendingSubscription paddingSubscribe = this.clientSession.getPaddingSubscribe(messageId);
        if (paddingSubscribe == null) {
            return;
        }
        List<MqttClientSubscription> subscriptionList = paddingSubscribe.getSubscriptionList();
        MqttSubAckPayload subAckPayload = message.payload();
        List reasonCodeList = subAckPayload.reasonCodes();
        if (reasonCodeList.isEmpty()) {
            logger.error("MqttClient subscriptionList:{} subscribe failed reasonCodes is empty messageId:{}", subscriptionList, (Object)messageId);
            return;
        }
        ArrayList<MqttClientSubscription> subscribedList = new ArrayList<MqttClientSubscription>();
        for (int i = 0; i < subscriptionList.size(); ++i) {
            MqttClientSubscription subscription = subscriptionList.get(i);
            String topicFilter = subscription.getTopicFilter();
            Integer reasonCode = (Integer)reasonCodeList.get(i);
            if (reasonCode == null || reasonCode < 0 || reasonCode > 2) {
                logger.error("MqttClient topicFilter:{} subscribe failed reasonCodes:{} messageId:{}", new Object[]{topicFilter, reasonCode, messageId});
                continue;
            }
            subscribedList.add(subscription);
        }
        logger.info("MQTT subscriptionList:{} subscribed successfully messageId:{}", subscribedList, (Object)messageId);
        paddingSubscribe.onSubAckReceived();
        this.clientSession.removePaddingSubscribe(messageId);
        this.clientSession.addSubscriptionList(subscribedList);
        subscribedList.forEach(clientSubscription -> {
            IMqttClientMessageListener subscriptionListener = clientSubscription.getListener();
            try {
                this.executor.execute(() -> subscriptionListener.onSubscribed(clientSubscription.getTopicFilter(), clientSubscription.getMqttQoS()));
            }
            catch (Throwable e) {
                logger.error("MQTT SubscriptionList:{} subscribed onSubscribed event error.", (Object)subscribedList);
            }
        });
    }

    @Override
    public void processPublish(ChannelContext context, MqttPublishMessage message) {
        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttPublishVariableHeader variableHeader = message.variableHeader();
        String topicName = variableHeader.topicName();
        MqttQoS mqttQoS = mqttFixedHeader.qosLevel();
        int packetId = variableHeader.packetId();
        logger.debug("MqttClient received publish topic:{} qoS:{} packetId:{}", new Object[]{topicName, mqttQoS, packetId});
        switch (mqttFixedHeader.qosLevel()) {
            case AT_MOST_ONCE: {
                this.invokeListenerForPublish(topicName, message);
                break;
            }
            case AT_LEAST_ONCE: {
                this.invokeListenerForPublish(topicName, message);
                if (packetId == -1) break;
                MqttMessage messageAck = MqttMessageBuilders.pubAck().packetId(packetId).build();
                Tio.send((ChannelContext)context, (Packet)messageAck);
                break;
            }
            case EXACTLY_ONCE: {
                if (packetId == -1) break;
                MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
                MqttMessage pubRecMessage = new MqttMessage(fixedHeader, (Object)MqttMessageIdVariableHeader.from((int)packetId));
                MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
                this.clientSession.addPendingQos2Publish(packetId, pendingQos2Publish);
                pendingQos2Publish.startPubRecRetransmitTimer(this.ackService, msg -> Tio.send((ChannelContext)context, (Packet)msg));
                break;
            }
        }
    }

    @Override
    public void processUnSubAck(MqttUnsubAckMessage message) {
        int messageId = message.variableHeader().messageId();
        logger.debug("MqttClient UnSubAck messageId:{}", (Object)messageId);
        MqttPendingUnSubscription pendingUnSubscription = this.clientSession.getPaddingUnSubscribe(messageId);
        if (pendingUnSubscription == null) {
            return;
        }
        List<String> unSubscriptionTopics = pendingUnSubscription.getTopics();
        if (logger.isInfoEnabled()) {
            logger.info("MQTT Topic:{} successfully unSubscribed messageId:{}", unSubscriptionTopics, (Object)messageId);
        }
        pendingUnSubscription.onUnSubAckReceived();
        this.clientSession.removePaddingUnSubscribe(messageId);
        this.clientSession.removeSubscriptions(unSubscriptionTopics);
    }

    @Override
    public void processPubAck(MqttPubAckMessage message) {
        int messageId = message.variableHeader().messageId();
        logger.debug("MqttClient PubAck messageId:{}", (Object)messageId);
        MqttPendingPublish pendingPublish = this.clientSession.getPendingPublish(messageId);
        if (pendingPublish == null) {
            return;
        }
        if (logger.isInfoEnabled()) {
            String topicName = pendingPublish.getMessage().variableHeader().topicName();
            logger.info("MQTT Topic:{} successfully PubAck messageId:{}", (Object)topicName, (Object)messageId);
        }
        pendingPublish.onPubAckReceived();
        this.clientSession.removePendingPublish(messageId);
        pendingPublish.getPayload().clear();
    }

    @Override
    public void processPubRec(ChannelContext context, MqttMessage message) {
        int messageId = ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
        logger.debug("MqttClient PubRec messageId:{}", (Object)messageId);
        MqttPendingPublish pendingPublish = this.clientSession.getPendingPublish(messageId);
        pendingPublish.onPubAckReceived();
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader)message.variableHeader();
        MqttMessage pubRelMessage = new MqttMessage(fixedHeader, (Object)variableHeader);
        Tio.send((ChannelContext)context, (Packet)pubRelMessage);
        pendingPublish.setPubRelMessage(pubRelMessage);
        pendingPublish.startPubRelRetransmissionTimer(this.ackService, msg -> Tio.send((ChannelContext)context, (Packet)msg));
    }

    @Override
    public void processPubRel(ChannelContext context, MqttMessage message) {
        int messageId = ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
        logger.debug("MqttClient PubRel messageId:{}", (Object)messageId);
        MqttPendingQos2Publish pendingQos2Publish = this.clientSession.getPendingQos2Publish(messageId);
        if (pendingQos2Publish != null) {
            MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
            String topicName = incomingPublish.variableHeader().topicName();
            this.invokeListenerForPublish(topicName, incomingPublish);
            pendingQos2Publish.onPubRelReceived();
            this.clientSession.removePendingQos2Publish(messageId);
        }
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from((int)messageId);
        Tio.send((ChannelContext)context, (Packet)new MqttMessage(fixedHeader, (Object)variableHeader));
    }

    @Override
    public void processPubComp(MqttMessage message) {
        int messageId = ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
        MqttPendingPublish pendingPublish = this.clientSession.getPendingPublish(messageId);
        if (pendingPublish == null) {
            return;
        }
        if (logger.isInfoEnabled()) {
            String topicName = pendingPublish.getMessage().variableHeader().topicName();
            logger.info("MQTT Topic:{} successfully PubComp", (Object)topicName);
        }
        pendingPublish.getPayload().clear();
        pendingPublish.onPubCompReceived();
        this.clientSession.removePendingPublish(messageId);
    }

    private void invokeListenerForPublish(String topicName, MqttPublishMessage message) {
        List<MqttClientSubscription> subscriptionList = this.clientSession.getMatchedSubscription(topicName);
        if (subscriptionList.isEmpty()) {
            logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", (Object)topicName);
        } else {
            ByteBuffer payload = message.payload();
            subscriptionList.forEach(subscription -> {
                IMqttClientMessageListener listener = subscription.getListener();
                payload.rewind();
                this.executor.submit(() -> {
                    try {
                        listener.onMessage(topicName, payload);
                    }
                    catch (Throwable e) {
                        logger.error(e.getMessage(), e);
                    }
                });
            });
        }
    }
}

