/*
 * Decompiled with CFR 0.152.
 */
package com.feingto.iot.server.handler.mqtt;

import com.feingto.iot.common.Constants;
import com.feingto.iot.common.model.mqtt.SendMessage;
import com.feingto.iot.common.model.mqtt.SubscribeMessage;
import com.feingto.iot.common.service.mqtt.MessageResponse;
import com.feingto.iot.server.cache.RetainedCache;
import com.feingto.iot.server.cache.SubscribeCache;
import com.feingto.iot.server.handler.BaseMessageHandler;
import com.feingto.iot.server.service.PushService;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.ignite.IgniteCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class SubscribeHandler
extends BaseMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(SubscribeHandler.class);
    @Resource(name="igniteSubscribe")
    private IgniteCache<String, ConcurrentHashMap<String, SubscribeMessage>> igniteSubscribe;
    @Resource(name="igniteRetained")
    private IgniteCache<String, SendMessage> igniteRetained;
    @Resource
    private PushService pushService;

    public SubscribeHandler() {
        super(MqttMessageType.SUBSCRIBE);
    }

    public void handle(Channel channel, Object object) {
        String clientId = (String)channel.attr(Constants.KEY_CLIENT_ID).get();
        MqttSubscribeMessage msg = (MqttSubscribeMessage)object;
        msg.payload().topicSubscriptions().forEach(subscription -> SubscribeCache.getInstance((IgniteCache)this.igniteSubscribe).put(subscription.topicName(), new SubscribeMessage().clientId(clientId).topicName(subscription.topicName()).mqttQoS(subscription.qualityOfService().value())));
        MessageResponse.suback((Channel)channel, (Iterable)msg.payload().topicSubscriptions().stream().map(subscription -> subscription.qualityOfService().value()).collect(Collectors.toList()), (int)msg.variableHeader().messageId());
        SubscribeCache.getInstance((IgniteCache)this.igniteSubscribe).findByClientId(clientId).forEach(subscribe -> Optional.ofNullable(RetainedCache.getInstance((IgniteCache)this.igniteRetained).get(subscribe.topicName())).ifPresent(sendMessage -> {
            log.debug(">>> publish retained message to {}", (Object)clientId);
            this.pushService.internalSend(sendMessage);
        }));
    }
}

