/*
 * Decompiled with CFR 0.152.
 */
package com.feingto.iot.server.handler.mqtt;

import com.feingto.cloud.kit.IdGenerator;
import com.feingto.iot.common.Constants;
import com.feingto.iot.common.model.mqtt.ImSpan;
import com.feingto.iot.common.model.mqtt.SendMessage;
import com.feingto.iot.common.model.mqtt.SubscribeMessage;
import com.feingto.iot.common.service.mqtt.MessageResponse;
import com.feingto.iot.server.cache.MessageCache;
import com.feingto.iot.server.cache.RetainedCache;
import com.feingto.iot.server.cache.SubscribeCache;
import com.feingto.iot.server.handler.BaseMessageHandler;
import com.feingto.iot.server.handler.mqtt.PublishHandler;
import com.feingto.iot.server.serialize.JSON;
import com.feingto.iot.server.service.PushService;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Resource;
import org.apache.ignite.IgniteCache;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class PublishHandler
extends BaseMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(PublishHandler.class);
    @Resource(name="igniteSubscribe")
    private IgniteCache<String, ConcurrentHashMap<String, SubscribeMessage>> igniteSubscribe;
    @Resource(name="igniteMessage")
    private IgniteCache<String, ConcurrentHashMap<Integer, SendMessage>> igniteMessage;
    @Resource(name="igniteRetained")
    private IgniteCache<String, SendMessage> igniteRetained;
    @Resource
    private PushService pushService;
    @Resource
    private TransportClient client;
    @Resource
    private String spanIndexName;
    @Resource
    private IdGenerator idGenerator;

    public PublishHandler() {
        super(MqttMessageType.PUBLISH);
    }

    public void handle(Channel channel, Object object) {
        MqttPublishMessage msg = (MqttPublishMessage)object;
        MqttFixedHeader fixedHeader = msg.fixedHeader();
        MqttPublishVariableHeader variableHeader = msg.variableHeader();
        SendMessage sendMessage = SendMessage.newInstance((MqttPublishMessage)msg).from((String)channel.attr(Constants.KEY_CLIENT_ID).get());
        if (sendMessage.retain()) {
            log.debug(">>> save retain message on {}", (Object)sendMessage.topic());
            RetainedCache.getInstance((IgniteCache)this.igniteRetained).put(sendMessage.topic(), sendMessage);
        }
        sendMessage.retain(false);
        this.store(sendMessage);
        switch (1.$SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[fixedHeader.qosLevel().ordinal()]) {
            case 1: {
                this.pushService.internalSend(sendMessage);
                break;
            }
            case 2: {
                this.storeMessage(sendMessage, MqttMessageType.PUBLISH);
                this.pushService.internalSend(sendMessage);
                MessageResponse.puback((Channel)channel, (MqttMessageType)MqttMessageType.PUBACK, (MqttQoS)MqttQoS.AT_MOST_ONCE, (int)variableHeader.packetId());
                break;
            }
            case 3: {
                this.storeMessage(sendMessage, MqttMessageType.PUBREL);
                MessageResponse.puback((Channel)channel, (MqttMessageType)MqttMessageType.PUBREC, (MqttQoS)MqttQoS.AT_LEAST_ONCE, (int)variableHeader.packetId());
                break;
            }
        }
    }

    private void store(SendMessage message) {
        SubscribeCache.getInstance((IgniteCache)this.igniteSubscribe).findByTopic(message.topic()).forEach(subscribe -> {
            log.debug(">>> store on cloud with topic {}", (Object)message.topic());
            this.storeOnEs(subscribe.clientId(), message);
        });
    }

    private void storeOnEs(String toClientId, SendMessage message) {
        if (toClientId.equals(message.from())) {
            return;
        }
        this.client.prepareIndex(this.spanIndexName, "mqtt").setSource(JSON.getInstance().obj2json((Object)new ImSpan().id(this.idGenerator.nextId()).from(message.from()).to(toClientId).topic(message.topic()).payload(new String(message.payload(), StandardCharsets.UTF_8))), XContentType.JSON).get();
    }

    private void storeMessage(SendMessage message, MqttMessageType type) {
        SubscribeCache.getInstance((IgniteCache)this.igniteSubscribe).findByTopic(message.topic()).forEach(subscribe -> MessageCache.getInstance((IgniteCache)this.igniteMessage).put(subscribe.clientId(), message.type(type)));
    }
}

