/*
 * Decompiled with CFR 0.152.
 */
package net.dreamlu.iot.mqtt.core.server;

import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttHttpHelper;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.DefaultAioListener;
import org.tio.core.intf.Packet;
import org.tio.utils.hutool.StrUtil;

public class MqttServerAioListener
extends DefaultAioListener {
    private static final Logger logger = LoggerFactory.getLogger(MqttServerAioListener.class);
    private final IMqttMessageStore messageStore;
    private final IMqttSessionManager sessionManager;
    private final IMqttMessageDispatcher messageDispatcher;
    private final IMqttConnectStatusListener connectStatusListener;
    private final ThreadPoolExecutor executor;

    public MqttServerAioListener(MqttServerCreator serverCreator, ThreadPoolExecutor executor) {
        this.messageStore = serverCreator.getMessageStore();
        this.sessionManager = serverCreator.getSessionManager();
        this.messageDispatcher = serverCreator.getMessageDispatcher();
        this.connectStatusListener = serverCreator.getConnectStatusListener();
        this.executor = executor;
    }

    public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) {
        String clientId = context.getBsId();
        logger.info("Mqtt HeartbeatTimeout clientId:{} interval:{} count:{}", new Object[]{clientId, interval, heartbeatTimeoutCount});
        return false;
    }

    public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
        boolean isNotNormalDisconnect;
        boolean isHttpRequest;
        boolean bl = isHttpRequest = context.get("is_http") != null;
        if (isHttpRequest) {
            context.remove("is_http");
            return;
        }
        String clientId = context.getBsId();
        boolean bl2 = isNotNormalDisconnect = context.get("disconnected") == null;
        if (isNotNormalDisconnect || throwable != null) {
            if (throwable instanceof IOException) {
                logger.error("Mqtt server close clientId:{}, remark:{} isRemove:{} error:{}", new Object[]{clientId, remark, isRemove, throwable.getMessage()});
            } else {
                logger.error("Mqtt server close clientId:{}, remark:{} isRemove:{}", new Object[]{clientId, remark, isRemove, throwable});
            }
        } else {
            logger.info("Mqtt server close clientId:{} remark:{} isRemove:{}", new Object[]{clientId, remark, isRemove});
        }
        if (StrUtil.isBlank((CharSequence)clientId)) {
            return;
        }
        if (isNotNormalDisconnect) {
            this.sendWillMessage(clientId);
        }
        this.cleanSession(clientId);
        context.remove("disconnected");
        String username = (String)context.get("username");
        context.remove("username");
        this.notify(context, clientId, username, remark);
    }

    private void sendWillMessage(String clientId) {
        try {
            Message willMessage = this.messageStore.getWillMessage(clientId);
            if (willMessage == null) {
                return;
            }
            boolean result = this.messageDispatcher.send(willMessage);
            logger.debug("Mqtt server clientId:{} send willMessage result:{}.", (Object)clientId, (Object)result);
            this.messageStore.clearWillMessage(clientId);
        }
        catch (Throwable throwable) {
            logger.error("Mqtt server clientId:{} send willMessage error.", (Object)clientId, (Object)throwable);
        }
    }

    private void cleanSession(String clientId) {
        try {
            this.sessionManager.remove(clientId);
        }
        catch (Throwable throwable) {
            logger.error("Mqtt server clientId:{} session clean error.", (Object)clientId, (Object)throwable);
        }
    }

    private void notify(ChannelContext context, String clientId, String username, String remark) {
        this.executor.execute(() -> {
            try {
                this.connectStatusListener.offline(context, clientId, username, remark);
            }
            catch (Throwable throwable) {
                logger.error("Mqtt server clientId:{} offline notify error.", (Object)clientId, (Object)throwable);
            }
        });
    }

    public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) {
        boolean isHttpRequest;
        boolean bl = isHttpRequest = context.get("is_http") != null;
        if (isHttpRequest) {
            MqttHttpHelper.close(context, packet);
        }
    }
}

