/*
 * Decompiled with CFR 0.152.
 */
package tech.smartboot.mqtt.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.smartboot.socket.Protocol;
import org.smartboot.socket.extension.processor.AbstractMessageProcessor;
import org.smartboot.socket.timer.HashedWheelTimer;
import org.smartboot.socket.timer.Timer;
import org.smartboot.socket.timer.TimerTask;
import org.smartboot.socket.transport.AioQuickClient;
import tech.smartboot.mqtt.client.MqttClientProcessor;
import tech.smartboot.mqtt.client.Options;
import tech.smartboot.mqtt.client.PublishBuilder;
import tech.smartboot.mqtt.client.Subscribe;
import tech.smartboot.mqtt.common.AbstractSession;
import tech.smartboot.mqtt.common.AsyncTask;
import tech.smartboot.mqtt.common.DefaultMqttWriter;
import tech.smartboot.mqtt.common.InflightQueue;
import tech.smartboot.mqtt.common.MqttProtocol;
import tech.smartboot.mqtt.common.TopicToken;
import tech.smartboot.mqtt.common.enums.MqttConnectReturnCode;
import tech.smartboot.mqtt.common.enums.MqttDisConnectReturnCode;
import tech.smartboot.mqtt.common.enums.MqttQoS;
import tech.smartboot.mqtt.common.enums.MqttVersion;
import tech.smartboot.mqtt.common.message.MessageBuilder;
import tech.smartboot.mqtt.common.message.MqttConnAckMessage;
import tech.smartboot.mqtt.common.message.MqttConnectMessage;
import tech.smartboot.mqtt.common.message.MqttDisconnectMessage;
import tech.smartboot.mqtt.common.message.MqttMessage;
import tech.smartboot.mqtt.common.message.MqttMessageBuilders;
import tech.smartboot.mqtt.common.message.MqttPingReqMessage;
import tech.smartboot.mqtt.common.message.MqttPublishMessage;
import tech.smartboot.mqtt.common.message.MqttSubAckMessage;
import tech.smartboot.mqtt.common.message.MqttSubscribeMessage;
import tech.smartboot.mqtt.common.message.MqttTopicSubscription;
import tech.smartboot.mqtt.common.message.MqttUnsubAckMessage;
import tech.smartboot.mqtt.common.message.payload.MqttConnectPayload;
import tech.smartboot.mqtt.common.message.variable.MqttConnAckVariableHeader;
import tech.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader;
import tech.smartboot.mqtt.common.message.variable.MqttDisconnectVariableHeader;
import tech.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import tech.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader;
import tech.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
import tech.smartboot.mqtt.common.message.variable.properties.DisConnectProperties;
import tech.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import tech.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
import tech.smartboot.mqtt.common.message.variable.properties.SubscribeProperties;
import tech.smartboot.mqtt.common.util.MqttUtil;
import tech.smartboot.mqtt.common.util.ValidateUtils;

public class MqttClient
extends AbstractSession {
    private static final Consumer<Integer> IGNORE = integer -> {};
    private long latestSendMessageTime;
    private static final HashedWheelTimer TIMER = new HashedWheelTimer(r -> new Thread(r, "client-timer"), 50L, 1024);
    private final Options options = new Options();
    private static final AbstractMessageProcessor<MqttMessage> messageProcessor = new MqttClientProcessor();
    private final ConcurrentLinkedQueue<Runnable> registeredTasks = new ConcurrentLinkedQueue();
    private final Map<String, Subscribe> subscribes = new ConcurrentHashMap<String, Subscribe>();
    private final Map<String, List<Subscribe>> mapping = new ConcurrentHashMap<String, List<Subscribe>>();
    private final List<TopicToken> wildcardsToken = new LinkedList<TopicToken>();
    private AioQuickClient client;
    private boolean connected = false;
    private Consumer<MqttConnAckMessage> connectConsumer;
    int pingTimeout;
    private TimerTask connectTimer;

    public MqttClient(String uri) {
        this(uri, opt -> {});
    }

    public MqttClient(String host, int port) {
        this(host, port, opt -> {});
    }

    public MqttClient(String host, int port, Consumer<Options> opt) {
        this("mqtt://" + host + ":" + port, opt);
    }

    public MqttClient(String uri, Consumer<Options> opt) {
        String[] array = uri.split(":");
        if (array[0].equals("mqtts")) {
            this.options.setHost(array[1].substring(2));
        } else if (array[0].equals("mqtt")) {
            this.options.setHost(array[1].substring(2));
        } else {
            throw new IllegalStateException("invalid URI Scheme, uri: " + uri);
        }
        this.options.setPort(MqttUtil.toInt((String)array[2]));
        opt.accept(this.options);
        this.clientId = this.options.getClientId();
        this.setMqttVersion(this.options.getMqttVersion());
    }

    public void connect() {
        this.connect(connAckMessage -> {});
    }

    public void connect(Consumer<MqttConnAckMessage> consumer) {
        this.connectConsumer = consumer;
        MqttUtil.updateConfig((Object)this.options, (String)"mqtt.client");
        this.client = new AioQuickClient(this.options.getHost(), this.options.getPort(), (Protocol)new MqttProtocol(this.options.getMaxPacketSize()), messageProcessor);
        try {
            this.client.setReadBufferSize(this.options.getBufferSize()).setWriteBuffer(this.options.getBufferSize(), 8).connectTimeout(this.options.getConnectionTimeout());
            this.session = this.options.group() != null ? this.client.start(this.options.group()) : this.client.start();
            this.session.setAttachment((Object)this);
            this.mqttWriter = new DefaultMqttWriter(this.session.writeBuffer());
            ConnectProperties properties = null;
            if (this.getMqttVersion() == MqttVersion.MQTT_5) {
                properties = new ConnectProperties();
            }
            MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(this.getMqttVersion(), MqttUtil.isNotBlank((CharSequence)this.options.getUserName()), this.options.getPassword() != null, this.options.getWillMessage(), this.options.isCleanSession(), this.options.getKeepAliveInterval(), properties);
            MqttConnectPayload payload = new MqttConnectPayload(this.clientId, this.options.getWillMessage(), this.options.getUserName(), this.options.getPassword());
            MqttConnectMessage connectMessage = new MqttConnectMessage(variableHeader, payload);
            this.connectTimer = TIMER.schedule((Runnable)new AsyncTask(){

                public void execute() {
                    if (!MqttClient.this.connected) {
                        System.out.println("connect timeout");
                        MqttClient.this.session.close();
                    }
                }
            }, (long)this.options.getConnectAckTimeout(), TimeUnit.SECONDS);
            this.write((MqttMessage)connectMessage);
            final long keepAliveInterval = TimeUnit.SECONDS.toMillis(this.options.getKeepAliveInterval());
            if (keepAliveInterval > 0L) {
                TIMER.schedule((Runnable)new AsyncTask(){

                    public void execute() {
                        if (MqttClient.this.pingTimeout >= 3) {
                            MqttClient.this.pingTimeout = 0;
                            MqttClient.this.session.close();
                            return;
                        }
                        long delay = System.currentTimeMillis() - MqttClient.this.latestSendMessageTime - keepAliveInterval;
                        if (delay > -10L) {
                            MqttPingReqMessage pingReqMessage = new MqttPingReqMessage();
                            MqttClient.this.write((MqttMessage)pingReqMessage);
                            ++MqttClient.this.pingTimeout;
                            TIMER.schedule((Runnable)((Object)this), keepAliveInterval, TimeUnit.MILLISECONDS);
                        } else {
                            TIMER.schedule((Runnable)((Object)this), -delay, TimeUnit.MILLISECONDS);
                        }
                    }
                }, keepAliveInterval, TimeUnit.MILLISECONDS);
            }
        }
        catch (IOException e) {
            e.getMessage();
            this.release();
        }
    }

    public void write(MqttMessage mqttMessage, boolean autoFlush) {
        super.write(mqttMessage, autoFlush);
        this.latestSendMessageTime = System.currentTimeMillis();
    }

    private void consumeTask() {
        Runnable runnable = this.registeredTasks.poll();
        if (runnable != null) {
            runnable.run();
        }
    }

    private void gcConfigure() {
        this.options.setWillMessage(null);
    }

    public void unsubscribe(String topic) {
        this.unsubscribe(new String[]{topic});
    }

    public MqttClient unsubscribe(String[] topics) {
        if (this.connected) {
            this.unsubscribe0(topics);
        } else {
            this.registeredTasks.offer(() -> this.unsubscribe0(topics));
        }
        return this;
    }

    private void unsubscribe0(String[] topics) {
        CompletableFuture future;
        HashSet<String> unsubscribedTopics = new HashSet<String>(topics.length);
        for (String unsubscribedTopic : topics) {
            if (!this.subscribes.containsKey(unsubscribedTopic)) continue;
            unsubscribedTopics.add(unsubscribedTopic);
        }
        if (unsubscribedTopics.isEmpty()) {
            System.err.println("empty unsubscribe topics detected!");
            return;
        }
        MqttMessageBuilders.UnsubscribeBuilder unsubscribeBuilder = MqttMessageBuilders.unsubscribe();
        unsubscribedTopics.forEach(arg_0 -> ((MqttMessageBuilders.UnsubscribeBuilder)unsubscribeBuilder).addTopicFilter(arg_0));
        if (this.getMqttVersion() == MqttVersion.MQTT_5) {
            ReasonProperties properties = new ReasonProperties();
            unsubscribeBuilder.properties(properties);
        }
        if ((future = this.getInflightQueue().offer((MessageBuilder)unsubscribeBuilder)) == null) {
            this.registeredTasks.offer(() -> this.unsubscribe0(topics));
            return;
        }
        future.whenComplete((message, throwable) -> {
            ValidateUtils.isTrue((boolean)(message instanceof MqttUnsubAckMessage), (String)"uncorrected message type.");
            for (String unsubscribedTopic : unsubscribedTopics) {
                this.subscribes.remove(unsubscribedTopic);
                this.wildcardsToken.removeIf(topicToken -> unsubscribedTopic.equals(topicToken.getTopicFilter()));
            }
            this.mapping.clear();
            this.consumeTask();
        });
        this.flush();
    }

    public MqttClient subscribe(String topic, MqttQoS qos, BiConsumer<MqttClient, MqttPublishMessage> consumer) {
        return this.subscribe(new String[]{topic}, new MqttQoS[]{qos}, consumer);
    }

    public MqttClient subscribe(String topic, MqttQoS qos, BiConsumer<MqttClient, MqttPublishMessage> consumer, BiConsumer<MqttClient, MqttQoS> subAckConsumer) {
        return this.subscribe(new String[]{topic}, new MqttQoS[]{qos}, consumer, subAckConsumer);
    }

    public MqttClient subscribe(String[] topics, MqttQoS[] qos, BiConsumer<MqttClient, MqttPublishMessage> consumer) {
        this.subscribe(topics, qos, consumer, (MqttClient mqttClient, MqttQoS mqttQoS) -> {});
        return this;
    }

    public MqttClient subscribe(String[] topics, MqttQoS[] qos, BiConsumer<MqttClient, MqttPublishMessage> consumer, BiConsumer<MqttClient, MqttQoS> subAckConsumer) {
        if (this.connected) {
            this.subscribe0(topics, qos, consumer, subAckConsumer);
        } else {
            this.registeredTasks.offer(() -> this.subscribe0(topics, qos, consumer, subAckConsumer));
        }
        return this;
    }

    private void subscribe0(String[] topic, MqttQoS[] qos, BiConsumer<MqttClient, MqttPublishMessage> consumer, BiConsumer<MqttClient, MqttQoS> subAckConsumer) {
        MqttMessageBuilders.SubscribeBuilder subscribeBuilder = MqttMessageBuilders.subscribe();
        for (int i = 0; i < topic.length; ++i) {
            subscribeBuilder.addSubscription(qos[i], topic[i]);
        }
        if (this.getMqttVersion() == MqttVersion.MQTT_5) {
            subscribeBuilder.subscribeProperties(new SubscribeProperties());
        }
        MqttSubscribeMessage subscribeMessage = subscribeBuilder.build();
        CompletableFuture future = this.getInflightQueue().offer((MessageBuilder)subscribeBuilder);
        if (future == null) {
            this.registeredTasks.offer(() -> this.subscribe0(topic, qos, consumer, subAckConsumer));
            return;
        }
        future.whenComplete((message, throwable) -> {
            List qosValues = ((MqttSubAckMessage)message).getPayload().grantedQoSLevels();
            ValidateUtils.isTrue((qosValues.size() == qos.length ? 1 : 0) != 0, (String)"invalid response");
            int i = 0;
            for (MqttTopicSubscription subscription : subscribeMessage.getPayload().getTopicSubscriptions()) {
                MqttQoS minQos = MqttQoS.valueOf((int)Math.min(subscription.getQualityOfService().value(), (Integer)qosValues.get(i++)));
                this.options.getTopicListener().subscribe(subscription.getTopicFilter(), subscription.getQualityOfService() == MqttQoS.FAILURE ? MqttQoS.FAILURE : minQos);
                if (subscription.getQualityOfService() != MqttQoS.FAILURE) {
                    this.subscribes.put(subscription.getTopicFilter(), new Subscribe(minQos, consumer));
                    TopicToken topicToken = new TopicToken(subscription.getTopicFilter());
                    if (topicToken.isWildcards()) {
                        this.wildcardsToken.add(topicToken);
                    }
                } else {
                    System.err.println("subscribe topic:" + subscription.getTopicFilter() + " fail");
                }
                this.mapping.clear();
                subAckConsumer.accept(this, minQos);
            }
            this.consumeTask();
        });
        this.flush();
    }

    void receiveConnAckMessage(MqttConnAckMessage connAckMessage) {
        this.connectTimer.cancel();
        this.connectTimer = null;
        if (!this.options.isAutomaticReconnect()) {
            this.gcConfigure();
        }
        if (((MqttConnAckVariableHeader)connAckMessage.getVariableHeader()).connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
            this.setInflightQueue(new InflightQueue((AbstractSession)this, this.options.getMaxInflight(), (Timer)TIMER));
            this.subscribes.forEach((k, v) -> this.subscribe((String)k, v.getQoS(), v.getConsumer()));
            this.connected = true;
            this.consumeTask();
        }
        if (!this.options.isCleanSession()) {
            // empty if block
        }
        this.connectConsumer.accept(connAckMessage);
    }

    public void publish(String topic, MqttQoS qos, byte[] payload) {
        this.publish(topic, qos, payload, false, true);
    }

    public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain) {
        this.publish(topic, qos, payload, retain, true);
    }

    public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain, boolean autoFlush) {
        this.publish(topic, qos, payload, retain, IGNORE, autoFlush);
    }

    public void publish(String topic, MqttQoS qos, byte[] payload, Consumer<Integer> consumer) {
        this.publish(topic, qos, payload, false, consumer, true);
    }

    public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain, Consumer<Integer> consumer) {
        this.publish(topic, qos, payload, retain, consumer, true);
    }

    public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain, Consumer<Integer> consumer, boolean autoFlush) {
        PublishBuilder publishBuilder = PublishBuilder.builder().topicName(topic).qos(qos).payload(payload).retained(retain);
        if (this.getMqttVersion() == MqttVersion.MQTT_5) {
            publishBuilder.publishProperties(new PublishProperties());
        }
        if (this.connected) {
            this.publish(publishBuilder, consumer, autoFlush);
        } else {
            this.registeredTasks.offer(() -> this.publish(publishBuilder, consumer, autoFlush));
        }
    }

    private void publish(PublishBuilder publishBuilder, Consumer<Integer> consumer, boolean autoFlush) {
        if (publishBuilder.qos() == MqttQoS.AT_MOST_ONCE) {
            this.write((MqttMessage)publishBuilder.build(), autoFlush);
            consumer.accept(0);
            return;
        }
        CompletableFuture future = this.inflightQueue.put((MessageBuilder)publishBuilder);
        future.whenComplete((message, throwable) -> consumer.accept(((MqttPacketIdVariableHeader)message.getVariableHeader()).getPacketId()));
        if (autoFlush) {
            this.flush();
        }
    }

    private List<TopicToken> getWildcardsToken() {
        return this.wildcardsToken;
    }

    public void accepted(MqttPublishMessage mqttPublishMessage) {
        MqttPublishVariableHeader header = (MqttPublishVariableHeader)mqttPublishMessage.getVariableHeader();
        List<Subscribe> cacheSubscribes = this.mapping.get(header.getTopicName());
        if (cacheSubscribes == null) {
            cacheSubscribes = new LinkedList<Subscribe>();
            Subscribe subscribe = this.subscribes.get(header.getTopicName());
            if (subscribe == null) {
                subscribe = this.matchWildcardsSubscribe(header.getTopicName());
            }
            if (subscribe != null) {
                cacheSubscribes.add(subscribe);
            }
            cacheSubscribes.addAll(this.matchShareSubscribe(header.getTopicName()));
            if (!cacheSubscribes.isEmpty()) {
                this.mapping.put(header.getTopicName(), cacheSubscribes);
            }
        }
        if (cacheSubscribes.isEmpty()) {
            return;
        }
        if (cacheSubscribes.size() == 1) {
            cacheSubscribes.get(0).getConsumer().accept(this, mqttPublishMessage);
        } else {
            Subscribe s = cacheSubscribes.remove(0);
            s.getConsumer().accept(this, mqttPublishMessage);
            cacheSubscribes.add(s);
        }
    }

    private List<Subscribe> matchShareSubscribe(String topicName) {
        ArrayList<Subscribe> matchedSubscribes = new ArrayList<Subscribe>();
        for (Map.Entry<String, Subscribe> entry : this.subscribes.entrySet()) {
            String actualTopic;
            TopicToken subscribeToken;
            TopicToken inputToken;
            String[] parts;
            String topicFilter = entry.getKey();
            if (!topicFilter.startsWith("$share/") || (parts = topicFilter.split("/", 3)).length < 3 || !MqttUtil.match((TopicToken)(inputToken = new TopicToken(topicName)), (TopicToken)(subscribeToken = new TopicToken(actualTopic = parts[2])))) continue;
            matchedSubscribes.add(entry.getValue());
        }
        return matchedSubscribes;
    }

    private Subscribe matchWildcardsSubscribe(String topicName) {
        TopicToken publicTopicToken = new TopicToken(topicName);
        TopicToken matchToken = this.getWildcardsToken().stream().filter(topicToken -> MqttUtil.match((TopicToken)publicTopicToken, (TopicToken)topicToken)).findFirst().orElse(null);
        return matchToken != null ? this.subscribes.get(matchToken.getTopicFilter()) : null;
    }

    public void disconnect() {
        if (this.disconnect) {
            return;
        }
        try {
            if (this.getMqttVersion() == MqttVersion.MQTT_5) {
                MqttDisconnectVariableHeader variableHeader = new MqttDisconnectVariableHeader(MqttDisConnectReturnCode.NORMAL_DISCONNECT, new DisConnectProperties());
                MqttDisconnectMessage message = new MqttDisconnectMessage(variableHeader);
                this.write((MqttMessage)message);
            } else {
                this.write((MqttMessage)new MqttDisconnectMessage());
            }
        }
        finally {
            this.options.setAutomaticReconnect(false);
            this.disconnect = true;
            this.mqttWriter = null;
            this.setInflightQueue(null);
            this.release();
        }
    }

    void release() {
        if (this.client != null) {
            this.client.shutdown();
            this.client = null;
        }
        if (this.options.isAutomaticReconnect()) {
            System.err.println("mqtt client:" + this.clientId + " is disconnect, try to reconnect...");
            TIMER.schedule(() -> this.connect(this.options.reconnectConsumer() == null ? this.connectConsumer : this.options.reconnectConsumer()), (long)this.options.getMaxReconnectDelay(), TimeUnit.MILLISECONDS);
        }
    }
}

