/*
 * Decompiled with CFR 0.152.
 */
package net.dreamlu.iot.mqtt.core.client;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubscribeMessage;
import net.dreamlu.iot.mqtt.codec.MqttUnsubscribeMessage;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageIdGenerator;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.core.client.IMqttClientSession;
import net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
import net.dreamlu.iot.mqtt.core.client.MqttClientSubscription;
import net.dreamlu.iot.mqtt.core.client.MqttPendingSubscription;
import net.dreamlu.iot.mqtt.core.client.MqttPendingUnSubscription;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
import net.dreamlu.iot.mqtt.core.util.timer.AckService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientGroupStat;
import org.tio.client.TioClient;
import org.tio.client.TioClientConfig;
import org.tio.client.intf.TioClientHandler;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.utils.lock.SetWithLock;

public final class MqttClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttClient.class);
    private final TioClient tioClient;
    private final MqttClientCreator config;
    private final TioClientConfig clientTioConfig;
    private final IMqttClientSession clientSession;
    private final AckService ackService;
    private final IMqttClientMessageIdGenerator messageIdGenerator;
    private ClientChannelContext context;

    public static MqttClientCreator create() {
        return new MqttClientCreator();
    }

    MqttClient(TioClient tioClient, MqttClientCreator config) {
        this.tioClient = tioClient;
        this.config = config;
        this.clientTioConfig = tioClient.getTioClientConfig();
        this.ackService = config.getAckService();
        this.clientSession = config.getClientSession();
        this.messageIdGenerator = config.getMessageIdGenerator();
        this.startHeartbeatTask();
    }

    public MqttClient subQos0(String topicFilter, IMqttClientMessageListener listener) {
        return this.subscribe(topicFilter, MqttQoS.AT_MOST_ONCE, listener);
    }

    public MqttClient subQos1(String topicFilter, IMqttClientMessageListener listener) {
        return this.subscribe(topicFilter, MqttQoS.AT_LEAST_ONCE, listener);
    }

    public MqttClient subQos2(String topicFilter, IMqttClientMessageListener listener) {
        return this.subscribe(topicFilter, MqttQoS.EXACTLY_ONCE, listener);
    }

    public MqttClient subscribe(MqttQoS mqttQoS, String topicFilter, IMqttClientMessageListener listener) {
        return this.subscribe(topicFilter, mqttQoS, listener, null);
    }

    public MqttClient subscribe(String topicFilter, MqttQoS mqttQoS, IMqttClientMessageListener listener) {
        return this.subscribe(topicFilter, mqttQoS, listener, null);
    }

    public MqttClient subscribe(String topicFilter, MqttQoS mqttQoS, IMqttClientMessageListener listener, MqttProperties properties) {
        return this.subscribe(Collections.singletonList(new MqttClientSubscription(mqttQoS, topicFilter, listener)), properties);
    }

    public MqttClient subscribe(String[] topicFilters, MqttQoS mqttQoS, IMqttClientMessageListener listener) {
        return this.subscribe(topicFilters, mqttQoS, listener, null);
    }

    public MqttClient subscribe(String[] topicFilters, MqttQoS mqttQoS, IMqttClientMessageListener listener, MqttProperties properties) {
        Objects.requireNonNull(topicFilters, "MQTT subscribe topicFilters is null.");
        ArrayList<MqttClientSubscription> subscriptionList = new ArrayList<MqttClientSubscription>();
        for (String topicFilter : topicFilters) {
            subscriptionList.add(new MqttClientSubscription(mqttQoS, topicFilter, listener));
        }
        return this.subscribe(subscriptionList, properties);
    }

    public MqttClient subscribe(List<MqttClientSubscription> subscriptionList) {
        return this.subscribe(subscriptionList, null);
    }

    public MqttClient subscribe(List<MqttClientSubscription> subscriptionList, MqttProperties properties) {
        ArrayList<MqttClientSubscription> needSubscriptionList = new ArrayList<MqttClientSubscription>();
        for (MqttClientSubscription subscription : subscriptionList) {
            TopicUtil.validateTopicFilter(subscription.getTopicFilter());
            boolean subscribed = this.clientSession.isSubscribed(subscription);
            if (subscribed) continue;
            needSubscriptionList.add(subscription);
        }
        if (needSubscriptionList.isEmpty()) {
            return this;
        }
        List topicSubscriptionList = needSubscriptionList.stream().map(MqttClientSubscription::toTopicSubscription).collect(Collectors.toList());
        int messageId = this.messageIdGenerator.getId();
        MqttSubscribeMessage message = MqttMessageBuilders.subscribe().addSubscriptions(topicSubscriptionList).messageId(messageId).properties(properties).build();
        if (this.isConnected()) {
            Boolean result = Tio.send((ChannelContext)this.getContext(), (Packet)message);
            logger.info("MQTT subscriptionList:{} messageId:{} subscribing result:{}", new Object[]{needSubscriptionList, messageId, result});
            MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(needSubscriptionList, message);
            pendingSubscription.startRetransmitTimer(this.ackService, msg -> Tio.send((ChannelContext)this.getContext(), (Packet)message));
            this.clientSession.addPaddingSubscribe(messageId, pendingSubscription);
        } else {
            this.clientSession.addSubscriptionList(needSubscriptionList);
        }
        return this;
    }

    public MqttClient unSubscribe(String ... topicFilters) {
        return this.unSubscribe(Arrays.asList(topicFilters));
    }

    public MqttClient unSubscribe(List<String> topicFilters) {
        TopicUtil.validateTopicFilter(topicFilters);
        this.clientSession.removePaddingSubscribes(topicFilters);
        this.clientSession.removeSubscriptions(topicFilters);
        int messageId = this.messageIdGenerator.getId();
        MqttUnsubscribeMessage message = MqttMessageBuilders.unsubscribe().addTopicFilters(topicFilters).messageId(messageId).build();
        MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilters, message);
        Boolean result = Tio.send((ChannelContext)this.getContext(), (Packet)message);
        logger.info("MQTT Topic:{} messageId:{} unSubscribing result:{}", new Object[]{topicFilters, messageId, result});
        this.clientSession.addPaddingUnSubscribe(messageId, pendingUnSubscription);
        pendingUnSubscription.startRetransmissionTimer(this.ackService, msg -> Tio.send((ChannelContext)this.getContext(), (Packet)msg));
        return this;
    }

    public boolean publish(String topic, ByteBuffer payload) {
        return this.publish(topic, payload, MqttQoS.AT_MOST_ONCE);
    }

    public boolean publish(String topic, byte[] payload) {
        return this.publish(topic, payload, MqttQoS.AT_MOST_ONCE);
    }

    public boolean publish(String topic, ByteBuffer payload, MqttQoS qos) {
        return this.publish(topic, payload, qos, false);
    }

    public boolean publish(String topic, byte[] payload, MqttQoS qos) {
        return this.publish(topic, payload, qos, false);
    }

    public boolean publish(String topic, ByteBuffer payload, boolean retain) {
        return this.publish(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
    }

    public boolean publish(String topic, byte[] payload, boolean retain) {
        return this.publish(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
    }

    public boolean publish(String topic, byte[] payload, MqttQoS qos, boolean retain) {
        return this.publish(topic, payload == null ? null : ByteBuffer.wrap(payload), qos, retain);
    }

    public boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
        return this.publish(topic, payload, qos, publishBuilder -> publishBuilder.retained(retain));
    }

    public boolean publish(String topic, ByteBuffer payload, MqttQoS qos, boolean retain, MqttProperties properties) {
        return this.publish(topic, payload, qos, publishBuilder -> publishBuilder.retained(retain).properties(properties));
    }

    public boolean publish(String topic, ByteBuffer payload, MqttQoS qos, Consumer<MqttMessageBuilders.PublishBuilder> builder) {
        int messageId;
        TopicUtil.validateTopicName(topic);
        boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
        int n = messageId = isHighLevelQoS ? this.messageIdGenerator.getId() : -1;
        if (payload == null) {
            payload = ByteBuffer.allocate(0);
        }
        MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish();
        builder.accept(publishBuilder);
        publishBuilder.topicName(topic).payload(payload).messageId(messageId).qos(qos);
        MqttPublishMessage message = publishBuilder.build();
        boolean result = Tio.send((ChannelContext)this.getContext(), (Packet)message);
        logger.debug("MQTT Topic:{} qos:{} retain:{} publish result:{}", new Object[]{topic, qos, publishBuilder.isRetained(), result});
        if (isHighLevelQoS) {
            MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
            this.clientSession.addPendingPublish(messageId, pendingPublish);
            pendingPublish.startPublishRetransmissionTimer(this.ackService, msg -> Tio.send((ChannelContext)this.getContext(), (Packet)msg));
        }
        return result;
    }

    MqttClient start(boolean sync) {
        this.ackService.start();
        Node node = new Node(this.config.getIp(), this.config.getPort());
        try {
            if (sync) {
                this.tioClient.connect(node, this.config.getTimeout());
            } else {
                this.tioClient.asynConnect(node, this.config.getTimeout());
            }
            return this;
        }
        catch (Exception e) {
            throw new IllegalStateException("Mica mqtt client async start fail.", e);
        }
    }

    public void reconnect() {
        ClientChannelContext channelContext = this.getContext();
        if (channelContext == null) {
            return;
        }
        try {
            if (channelContext.isRemoved) {
                channelContext.setRemoved(false);
            }
            this.tioClient.reconnect(channelContext, this.config.getTimeout());
        }
        catch (Exception e) {
            logger.error("mqtt client reconnect error", (Throwable)e);
        }
    }

    public boolean disconnect() {
        ClientChannelContext channelContext = this.getContext();
        if (channelContext == null) {
            return false;
        }
        boolean result = Tio.bSend((ChannelContext)channelContext, (Packet)MqttMessage.DISCONNECT);
        if (result) {
            Tio.close((ChannelContext)channelContext, null, (String)"MqttClient disconnect.", (boolean)true);
        }
        return result;
    }

    public boolean stop() {
        this.ackService.stop();
        this.disconnect();
        boolean result = this.tioClient.stop();
        logger.info("MqttClient stop result:{}", (Object)result);
        this.clientSession.clean();
        return result;
    }

    public TioClient getTioClient() {
        return this.tioClient;
    }

    public MqttClientCreator getClientCreator() {
        return this.config;
    }

    public TioClientConfig getClientTioConfig() {
        return this.clientTioConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ClientChannelContext getContext() {
        if (this.context != null) {
            return this.context;
        }
        MqttClient mqttClient = this;
        synchronized (mqttClient) {
            SetWithLock connectedSet;
            Set contextSet;
            if (this.context == null && (contextSet = (Set)(connectedSet = Tio.getConnecteds((TioClientConfig)this.clientTioConfig)).getObj()) != null && !contextSet.isEmpty()) {
                this.context = (ClientChannelContext)contextSet.iterator().next();
            }
        }
        return this.context;
    }

    public boolean isConnected() {
        ClientChannelContext channelContext = this.getContext();
        return channelContext != null && !channelContext.isClosed;
    }

    public boolean isDisconnected() {
        return !this.isConnected();
    }

    private void startHeartbeatTask() {
        long heartbeatTimeout = TimeUnit.SECONDS.toMillis(this.config.getKeepAliveSecs());
        if (heartbeatTimeout <= 0L) {
            logger.warn("\u7528\u6237\u53d6\u6d88\u4e86 mica-mqtt \u7684\u5fc3\u8df3\u5b9a\u65f6\u53d1\u9001\u529f\u80fd\uff0c\u8bf7\u7528\u6237\u81ea\u5df1\u53bb\u5b8c\u6210\u5fc3\u8df3\u673a\u5236");
            return;
        }
        ClientGroupStat clientGroupStat = (ClientGroupStat)this.clientTioConfig.groupStat;
        TioClientHandler clientHandler = this.clientTioConfig.getTioClientHandler();
        String id = this.clientTioConfig.getId();
        new Thread(() -> {
            while (!this.clientTioConfig.isStopped()) {
                SetWithLock setWithLock = this.clientTioConfig.connecteds;
                ReentrantReadWriteLock.ReadLock readLock = setWithLock.readLock();
                readLock.lock();
                try {
                    Set set = (Set)setWithLock.getObj();
                    long currTime = System.currentTimeMillis();
                    for (ChannelContext entry : set) {
                        Packet packet;
                        long interval;
                        ClientChannelContext channelContext = (ClientChannelContext)entry;
                        if (channelContext.isClosed || channelContext.isRemoved || (interval = currTime - channelContext.stat.latestTimeOfSentPacket) < heartbeatTimeout || (packet = clientHandler.heartbeatPacket((ChannelContext)channelContext)) == null) continue;
                        Boolean result = Tio.send((ChannelContext)channelContext, (Packet)packet);
                        if (!logger.isInfoEnabled()) continue;
                        logger.info("{} \u53d1\u9001\u5fc3\u8df3\u5305 result:{}", (Object)channelContext, (Object)result);
                    }
                    if (!this.clientTioConfig.debug || !logger.isInfoEnabled()) continue;
                    if (this.clientTioConfig.statOn) {
                        logger.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b)", new Object[]{id, set.size(), clientGroupStat.closed.get(), clientGroupStat.receivedPackets.get(), clientGroupStat.receivedBytes.get(), clientGroupStat.handledPackets.get(), clientGroupStat.sentPackets.get(), clientGroupStat.sentBytes.get()});
                        continue;
                    }
                    logger.info("[{}]: curr:{}, closed:{}", new Object[]{id, set.size(), clientGroupStat.closed.get()});
                }
                catch (Throwable e) {
                    logger.error("", e);
                }
                finally {
                    try {
                        readLock.unlock();
                        Thread.sleep(heartbeatTimeout / 3L);
                    }
                    catch (Throwable e) {
                        Thread.currentThread().interrupt();
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        }, "mqtt-heartbeat" + id).start();
    }
}

