/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mica.mqtt.core.server;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.dromara.mica.mqtt.codec.MqttMessageBuilders;
import org.dromara.mica.mqtt.codec.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.server.MqttServerCreator;
import org.dromara.mica.mqtt.core.server.enums.MessageType;
import org.dromara.mica.mqtt.core.server.http.core.MqttWebServer;
import org.dromara.mica.mqtt.core.server.model.ClientInfo;
import org.dromara.mica.mqtt.core.server.model.Message;
import org.dromara.mica.mqtt.core.server.model.Subscribe;
import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager;
import org.dromara.mica.mqtt.core.server.store.IMqttMessageStore;
import org.dromara.mica.mqtt.core.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.core.stat.vo.StatVo;
import org.tio.server.TioServer;
import org.tio.server.TioServerConfig;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.page.Page;
import org.tio.utils.page.PageUtils;
import org.tio.utils.timer.TimerTask;
import org.tio.utils.timer.TimerTaskService;

public final class MqttServer {
    private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
    private final TioServer tioServer;
    private final MqttWebServer webServer;
    private final MqttServerCreator serverCreator;
    private final IMqttSessionManager sessionManager;
    private final IMqttMessageStore messageStore;
    private final TimerTaskService taskService;

    MqttServer(TioServer tioServer, MqttWebServer webServer, MqttServerCreator serverCreator, TimerTaskService taskService) {
        this.tioServer = tioServer;
        this.webServer = webServer;
        this.serverCreator = serverCreator;
        this.sessionManager = serverCreator.getSessionManager();
        this.messageStore = serverCreator.getMessageStore();
        this.taskService = taskService;
    }

    public static MqttServerCreator create() {
        return new MqttServerCreator();
    }

    public TioServer getTioServer() {
        return this.tioServer;
    }

    public MqttWebServer getWebServer() {
        return this.webServer;
    }

    public TioServerConfig getServerConfig() {
        return this.tioServer.getServerConfig();
    }

    public MqttServerCreator getServerCreator() {
        return this.serverCreator;
    }

    public boolean publish(String clientId, String topic, byte[] payload) {
        return this.publish(clientId, topic, payload, MqttQoS.QOS0);
    }

    public boolean publish(String clientId, String topic, byte[] payload, MqttQoS qos) {
        return this.publish(clientId, topic, payload, qos, false);
    }

    public boolean publish(String clientId, String topic, byte[] payload, boolean retain) {
        return this.publish(clientId, topic, payload, MqttQoS.QOS0, retain);
    }

    public boolean publish(String clientId, String topic, byte[] payload, MqttQoS qos, boolean retain) {
        ChannelContext context;
        TopicUtil.validateTopicName((String)topic);
        if (retain) {
            this.saveRetainMessage(topic, qos, payload);
        }
        if ((context = Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId)) == null || context.isClosed()) {
            logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", (Object)topic, (Object)clientId);
            return false;
        }
        Integer subMqttQoS = this.sessionManager.searchSubscribe(topic, clientId);
        if (subMqttQoS == null) {
            logger.warn("Mqtt Topic:{} publish but clientId:{} not subscribed.", (Object)topic, (Object)clientId);
            return false;
        }
        MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf((int)subMqttQoS) : qos;
        return this.publish(context, clientId, topic, payload, mqttQoS, retain);
    }

    private boolean publish(ChannelContext context, String clientId, String topic, byte[] payload, MqttQoS qos, boolean retain) {
        boolean isHighLevelQoS = MqttQoS.QOS1 == qos || MqttQoS.QOS2 == qos;
        int messageId = isHighLevelQoS ? this.sessionManager.getMessageId(clientId) : -1;
        MqttPublishMessage message = MqttMessageBuilders.publish().topicName(topic).payload(payload).qos(qos).retained(retain).messageId(messageId).build();
        if (isHighLevelQoS) {
            MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
            this.sessionManager.addPendingPublish(clientId, messageId, pendingPublish);
            pendingPublish.startPublishRetransmissionTimer(this.taskService, context);
        }
        boolean result = Tio.send((ChannelContext)context, (Packet)message);
        logger.debug("MQTT Topic:{} qos:{} retain:{} publish clientId:{} result:{}", new Object[]{topic, qos, retain, clientId, result});
        return result;
    }

    public boolean publishAll(String topic, byte[] payload) {
        return this.publishAll(topic, payload, MqttQoS.QOS0);
    }

    public boolean publishAll(String topic, byte[] payload, MqttQoS qos) {
        return this.publishAll(topic, payload, qos, false);
    }

    public boolean publishAll(String topic, byte[] payload, boolean retain) {
        return this.publishAll(topic, payload, MqttQoS.QOS0, retain);
    }

    public boolean publishAll(String topic, byte[] payload, MqttQoS qos, boolean retain) {
        List<Subscribe> subscribeList;
        TopicUtil.validateTopicName((String)topic);
        if (retain) {
            this.saveRetainMessage(topic, qos, payload);
        }
        if ((subscribeList = this.sessionManager.searchSubscribe(topic)).isEmpty()) {
            logger.debug("Mqtt Topic:{} publishAll but subscribe client list is empty.", (Object)topic);
            return false;
        }
        for (Subscribe subscribe : subscribeList) {
            String clientId = subscribe.getClientId();
            ChannelContext context = Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId);
            if (context == null || context.isClosed()) {
                logger.warn("Mqtt Topic:{} publish to clientId:{} channel is null may be disconnected.", (Object)topic, (Object)clientId);
                continue;
            }
            int subMqttQoS = subscribe.getMqttQoS();
            MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf((int)subMqttQoS) : qos;
            this.publish(context, clientId, topic, payload, mqttQoS, false);
        }
        return true;
    }

    public boolean sendToClient(String topic, Message message) {
        String clientId = message.getClientId();
        MqttQoS mqttQoS = MqttQoS.valueOf((int)message.getQos());
        if (StrUtil.isBlank((CharSequence)clientId)) {
            return this.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain());
        }
        return this.publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain());
    }

    private void saveRetainMessage(String topic, MqttQoS mqttQoS, byte[] payload) {
        Message retainMessage = new Message();
        retainMessage.setTopic(topic);
        retainMessage.setQos(mqttQoS.value());
        retainMessage.setPayload(payload);
        retainMessage.setMessageType(MessageType.DOWN_STREAM);
        retainMessage.setRetain(true);
        retainMessage.setDup(false);
        retainMessage.setTimestamp(System.currentTimeMillis());
        retainMessage.setNode(this.serverCreator.getNodeName());
        this.messageStore.addRetainMessage(topic, retainMessage);
    }

    public ClientInfo getClientInfo(String clientId) {
        ChannelContext context = Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId);
        if (context == null) {
            return null;
        }
        return ClientInfo.form(this.serverCreator, context, ClientInfo::new);
    }

    public ClientInfo getClientInfo(ChannelContext context) {
        return ClientInfo.form(this.serverCreator, context, ClientInfo::new);
    }

    public List<ClientInfo> getClients() {
        return MqttServer.getClients(this.serverCreator, (TioConfig)this.getServerConfig());
    }

    public static List<ClientInfo> getClients(MqttServerCreator serverCreator, TioConfig tioConfig) {
        return Tio.getAll((TioConfig)tioConfig).stream().filter(MqttServer::isMqtt).map(context -> ClientInfo.form(serverCreator, context, ClientInfo::new)).collect(Collectors.toList());
    }

    public Page<ClientInfo> getClients(Integer pageIndex, Integer pageSize) {
        return MqttServer.getClients(this.serverCreator, (TioConfig)this.getServerConfig(), pageIndex, pageSize);
    }

    public static Page<ClientInfo> getClients(MqttServerCreator serverCreator, TioConfig tioConfig, Integer pageIndex, Integer pageSize) {
        Set contextSet = Tio.getAll((TioConfig)tioConfig).stream().filter(MqttServer::isMqtt).collect(Collectors.toSet());
        return PageUtils.fromSet(contextSet, (int)pageIndex, (int)pageSize, context -> ClientInfo.form(serverCreator, context, ClientInfo::new));
    }

    public StatVo getStat() {
        return this.tioServer.getServerConfig().getStat();
    }

    public List<Subscribe> getSubscriptions(String clientId) {
        return this.serverCreator.getSessionManager().getSubscriptions(clientId);
    }

    private static boolean isMqtt(ChannelContext context) {
        return StrUtil.isNotBlank((CharSequence)context.getBsId());
    }

    public TimerTask schedule(Runnable command, long delay) {
        return this.tioServer.schedule(command, delay);
    }

    public TimerTask schedule(Runnable command, long delay, Executor executor) {
        return this.tioServer.schedule(command, delay, executor);
    }

    public TimerTask scheduleOnce(Runnable command, long delay) {
        return this.tioServer.scheduleOnce(command, delay, null);
    }

    public TimerTask scheduleOnce(Runnable command, long delay, Executor executor) {
        return this.tioServer.scheduleOnce(command, delay, executor);
    }

    public ChannelContext getChannelContext(String clientId) {
        return Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId);
    }

    public void close(String clientId) {
        Tio.remove((ChannelContext)this.getChannelContext(clientId), (String)"Mqtt server close this connects.");
    }

    public boolean start() {
        try {
            this.tioServer.start(this.serverCreator.getIp(), this.serverCreator.getPort());
        }
        catch (IOException e) {
            String message = String.format("Mica mqtt tcp server port %d start fail.", this.serverCreator.getPort());
            throw new IllegalStateException(message, e);
        }
        if (this.webServer != null) {
            try {
                this.webServer.start();
            }
            catch (IOException e) {
                String message = String.format("Mica mqtt http/websocket server port %d start fail.", this.serverCreator.getWebPort());
                throw new IllegalStateException(message, e);
            }
        }
        return true;
    }

    public boolean stop() {
        boolean result = this.tioServer.stop();
        logger.info("Mqtt tcp server stop result:{}", (Object)result);
        if (this.webServer != null) {
            logger.info("Mqtt websocket server stop result:{}", (Object)(result &= this.webServer.stop()));
        }
        ExecutorService mqttExecutor = this.serverCreator.getMqttExecutor();
        try {
            mqttExecutor.shutdown();
        }
        catch (Exception e1) {
            logger.error(e1.getMessage(), (Throwable)e1);
        }
        try {
            result &= mqttExecutor.awaitTermination(10L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error(e.getMessage(), (Throwable)e);
        }
        try {
            this.sessionManager.clean();
        }
        catch (Throwable e) {
            logger.error("MqttServer stop session clean error.", e);
        }
        return result;
    }
}

