/*
 * Decompiled with CFR 0.152.
 */
package com.feingto.iot.server.handler.mqtt;

import com.feingto.iot.common.Constants;
import com.feingto.iot.common.model.mqtt.SendMessage;
import com.feingto.iot.common.model.mqtt.SessionStore;
import com.feingto.iot.common.model.mqtt.SubscribeMessage;
import com.feingto.iot.common.service.IAuth;
import com.feingto.iot.common.service.mqtt.MessageRequest;
import com.feingto.iot.common.service.mqtt.MessageResponse;
import com.feingto.iot.server.cache.MessageCache;
import com.feingto.iot.server.cache.SessionCache;
import com.feingto.iot.server.cache.SubscribeCache;
import com.feingto.iot.server.handler.BaseMessageHandler;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Resource;
import org.apache.ignite.IgniteCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class ConnectHandler
extends BaseMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(ConnectHandler.class);
    @Resource(name="igniteSubscribe")
    private IgniteCache<String, ConcurrentHashMap<String, SubscribeMessage>> igniteSubscribe;
    @Resource(name="igniteMessage")
    private IgniteCache<String, ConcurrentHashMap<Integer, SendMessage>> igniteMessage;
    @Resource(name="authService")
    private IAuth authService;

    public ConnectHandler() {
        super(MqttMessageType.CONNECT);
    }

    public void handle(Channel channel, Object object) {
        MqttConnectMessage msg = (MqttConnectMessage)object;
        MqttConnectPayload payload = msg.payload();
        String clientId = payload.clientIdentifier();
        if (StringUtils.isEmpty((Object)clientId)) {
            MessageResponse.connack((Channel)channel, (MqttConnectReturnCode)MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
            channel.close();
            return;
        }
        MqttConnectVariableHeader variableHeader = msg.variableHeader();
        if (variableHeader.hasUserName() && variableHeader.hasPassword() && !this.authService.authorized(payload.userName(), payload.passwordInBytes())) {
            MessageResponse.connack((Channel)channel, (MqttConnectReturnCode)MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            channel.close();
            return;
        }
        Optional.ofNullable(SessionCache.getInstance().get(clientId)).ifPresent(sess -> {
            if (sess.cleanSession()) {
                log.debug(">>> clean the previous session of the {}", (Object)clientId);
                SessionCache.getInstance().remove(clientId);
                SubscribeCache.getInstance((IgniteCache)this.igniteSubscribe).remove(clientId);
                MessageCache.getInstance((IgniteCache)this.igniteMessage).remove(clientId);
            }
            sess.channel().close();
        });
        SessionStore session = new SessionStore().cleanSession(variableHeader.isCleanSession());
        if (variableHeader.isWillFlag()) {
            log.debug(">>> save will message of the {}", (Object)clientId);
            session.willMessage(new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf((int)variableHeader.willQos()), variableHeader.isWillRetain(), 0), new MqttPublishVariableHeader(payload.willTopic(), 0), Unpooled.buffer().writeBytes(payload.willMessageInBytes())));
        }
        if (variableHeader.keepAliveTimeSeconds() > 0) {
            channel.pipeline().addFirst(new ChannelHandler[]{new IdleStateHandler(0, 0, Math.round((float)variableHeader.keepAliveTimeSeconds() * 1.5f))});
        }
        channel.attr(Constants.KEY_CLIENT_ID).set((Object)clientId);
        SessionCache.getInstance().put(clientId, session.channel(channel));
        MessageResponse.connack((Channel)channel, (MqttConnectReturnCode)MqttConnectReturnCode.CONNECTION_ACCEPTED);
        if (!variableHeader.isCleanSession()) {
            MessageCache.getInstance((IgniteCache)this.igniteMessage).findBylientId(clientId).stream().filter(cacheMessage -> MqttMessageType.PUBLISH.equals((Object)cacheMessage.type()) || MqttMessageType.PUBREL.equals((Object)cacheMessage.type())).forEach(cacheMessage -> MessageRequest.publish((Channel)channel, (SendMessage)cacheMessage));
        }
    }
}

