/*
 * Decompiled with CFR 0.152.
 */
package net.dreamlu.iot.mqtt.core.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
import net.dreamlu.iot.mqtt.core.util.timer.AckService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.server.TioServer;
import org.tio.server.TioServerConfig;
import org.tio.utils.hutool.StrUtil;

public final class MqttServer {
    private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
    private final TioServer tioServer;
    private final MqttWebServer webServer;
    private final MqttServerCreator serverCreator;
    private final IMqttSessionManager sessionManager;
    private final IMqttMessageStore messageStore;
    private final AckService ackService;

    MqttServer(TioServer tioServer, MqttWebServer webServer, MqttServerCreator serverCreator, AckService ackService) {
        this.tioServer = tioServer;
        this.webServer = webServer;
        this.serverCreator = serverCreator;
        this.sessionManager = serverCreator.getSessionManager();
        this.messageStore = serverCreator.getMessageStore();
        this.ackService = ackService;
    }

    public static MqttServerCreator create() {
        return new MqttServerCreator();
    }

    public TioServer getTioServer() {
        return this.tioServer;
    }

    public MqttWebServer getWebServer() {
        return this.webServer;
    }

    public TioServerConfig getServerConfig() {
        return this.tioServer.getTioServerConfig();
    }

    public MqttServerCreator getServerCreator() {
        return this.serverCreator;
    }

    public boolean publish(String clientId, String topic, ByteBuffer payload) {
        return this.publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE);
    }

    public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos) {
        return this.publish(clientId, topic, payload, qos, false);
    }

    public boolean publish(String clientId, String topic, ByteBuffer payload, boolean retain) {
        return this.publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE, retain);
    }

    public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
        TopicUtil.validateTopicName(topic);
        ChannelContext context = Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId);
        if (context == null || context.isClosed) {
            logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", (Object)topic, (Object)clientId);
            return false;
        }
        Integer subMqttQoS = this.sessionManager.searchSubscribe(topic, clientId);
        if (subMqttQoS == null) {
            logger.warn("Mqtt Topic:{} publish but clientId:{} not subscribed.", (Object)topic, (Object)clientId);
            return false;
        }
        MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf((int)subMqttQoS) : qos;
        return this.publish(context, clientId, topic, payload, mqttQoS, retain);
    }

    private boolean publish(ChannelContext context, String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
        int messageId;
        boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
        int n = messageId = isHighLevelQoS ? this.sessionManager.getMessageId(clientId) : -1;
        if (payload == null) {
            payload = ByteBuffer.allocate(0);
        } else {
            payload.rewind();
        }
        if (retain) {
            this.saveRetainMessage(topic, qos, payload);
        }
        MqttPublishMessage message = MqttMessageBuilders.publish().topicName(topic).payload(payload).qos(qos).retained(retain).messageId(messageId).build();
        boolean result = Tio.send((ChannelContext)context, (Packet)message);
        logger.debug("MQTT Topic:{} qos:{} retain:{} publish clientId:{} result:{}", new Object[]{topic, qos, retain, clientId, result});
        if (isHighLevelQoS) {
            MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
            this.sessionManager.addPendingPublish(clientId, messageId, pendingPublish);
            pendingPublish.startPublishRetransmissionTimer(this.ackService, msg -> Tio.send((ChannelContext)context, (Packet)msg));
        }
        return result;
    }

    public boolean publishAll(String topic, byte[] payload) {
        return this.publishAll(topic, payload, MqttQoS.AT_MOST_ONCE);
    }

    public boolean publishAll(String topic, ByteBuffer payload) {
        return this.publishAll(topic, payload, MqttQoS.AT_MOST_ONCE);
    }

    public boolean publishAll(String topic, byte[] payload, MqttQoS qos) {
        return this.publishAll(topic, payload, qos, false);
    }

    public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos) {
        return this.publishAll(topic, payload, qos, false);
    }

    public boolean publishAll(String topic, byte[] payload, boolean retain) {
        return this.publishAll(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
    }

    public boolean publishAll(String topic, ByteBuffer payload, boolean retain) {
        return this.publishAll(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
    }

    public boolean publishAll(String topic, byte[] payload, MqttQoS qos, boolean retain) {
        return this.publishAll(topic, payload == null ? null : ByteBuffer.wrap(payload), qos, retain);
    }

    public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
        TopicUtil.validateTopicName(topic);
        List<Subscribe> subscribeList = this.sessionManager.searchSubscribe(topic);
        if (subscribeList.isEmpty()) {
            logger.debug("Mqtt Topic:{} publishAll but subscribe client list is empty.", (Object)topic);
            return false;
        }
        if (payload == null) {
            payload = ByteBuffer.allocate(0);
        }
        if (retain) {
            this.saveRetainMessage(topic, qos, payload);
        }
        for (Subscribe subscribe : subscribeList) {
            String clientId = subscribe.getClientId();
            ChannelContext context = Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId);
            if (context == null || context.isClosed) {
                logger.warn("Mqtt Topic:{} publish to clientId:{} channel is null may be disconnected.", (Object)topic, (Object)clientId);
                continue;
            }
            int subMqttQoS = subscribe.getMqttQoS();
            MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf((int)subMqttQoS) : qos;
            this.publish(context, clientId, topic, payload, mqttQoS, false);
        }
        return true;
    }

    public boolean sendToClient(String topic, Message message) {
        String clientId = message.getClientId();
        MqttQoS mqttQoS = MqttQoS.valueOf((int)message.getQos());
        if (StrUtil.isBlank((CharSequence)clientId)) {
            return this.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain());
        }
        return this.publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain());
    }

    private void saveRetainMessage(String topic, MqttQoS mqttQoS, ByteBuffer payload) {
        Message retainMessage = new Message();
        retainMessage.setTopic(topic);
        retainMessage.setQos(mqttQoS.value());
        retainMessage.setPayload(payload);
        retainMessage.setMessageType(MessageType.DOWN_STREAM);
        retainMessage.setRetain(true);
        retainMessage.setDup(false);
        retainMessage.setTimestamp(System.currentTimeMillis());
        retainMessage.setNode(this.serverCreator.getNodeName());
        this.messageStore.addRetainMessage(topic, retainMessage);
    }

    public ChannelContext getChannelContext(String clientId) {
        return Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId);
    }

    public void close(String clientId) {
        Tio.remove((ChannelContext)this.getChannelContext(clientId), (String)"Mqtt server close this connects.");
    }

    public boolean start() {
        this.ackService.start();
        try {
            this.tioServer.start(this.serverCreator.getIp(), this.serverCreator.getPort());
        }
        catch (IOException e) {
            throw new IllegalStateException("Mica mqtt tcp server start fail.", e);
        }
        if (this.webServer != null) {
            try {
                this.webServer.start();
            }
            catch (IOException e) {
                throw new IllegalStateException("Mica mqtt http/websocket server start fail.", e);
            }
        }
        return true;
    }

    public boolean stop() {
        this.ackService.stop();
        boolean result = this.tioServer.stop();
        logger.info("Mqtt tcp server stop result:{}", (Object)result);
        if (this.webServer != null) {
            logger.info("Mqtt websocket server stop result:{}", (Object)(result &= this.webServer.stop()));
        }
        try {
            this.sessionManager.clean();
        }
        catch (Throwable e) {
            logger.error("MqttServer stop session clean error.", e);
        }
        return result;
    }
}

