/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.server;

import java.beans.ConstructorProperties;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.CommonDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageEncodeContext;
import org.jetlinks.core.message.codec.ToDeviceMessageContext;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultSendToDeviceMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultSendToDeviceMessageHandler.class);
    private final String serverId;
    private final DeviceSessionManager sessionManager;
    private final MessageHandler handler;
    private final DeviceRegistry registry;
    private final DecodedClientMessageHandler decodedClientMessageHandler;

    public void startup() {
        this.handler.handleSendToDeviceMessage(this.serverId).subscribe(message -> {
            if (message instanceof DeviceMessage) {
                this.handleDeviceMessage((DeviceMessage)message);
            }
        });
        this.handler.handleGetDeviceState(this.serverId, deviceId -> Flux.from((Publisher)deviceId).map(id -> new DeviceStateInfo(id, this.sessionManager.sessionIsAlive(id) ? (byte)1 : (byte)-1)));
    }

    protected void handleDeviceMessage(DeviceMessage message) {
        String deviceId = message.getDeviceId();
        DeviceSession session = this.sessionManager.getSession(deviceId);
        if (session != null) {
            this.doSend(message, session);
        } else {
            this.registry.getDevice(deviceId).flatMap(deviceOperator -> deviceOperator.getSelfConfig((ConfigKey)DeviceConfigKey.parentGatewayId).flatMap(arg_0 -> ((DeviceRegistry)this.registry).getDevice(arg_0))).flatMap(operator -> {
                ChildDeviceMessage children = new ChildDeviceMessage();
                children.setDeviceId(operator.getDeviceId());
                children.setMessageId(message.getMessageId());
                children.setTimestamp(message.getTimestamp());
                children.setChildDeviceId(deviceId);
                children.setChildDeviceMessage((Message)message);
                children.setHeaders(message.getHeaders());
                ChildrenDeviceSession childrenDeviceSession = this.sessionManager.getSession(deviceId, operator.getDeviceId());
                if (null != childrenDeviceSession) {
                    this.doSend((DeviceMessage)children, (DeviceSession)childrenDeviceSession);
                    return Mono.just((Object)true);
                }
                DeviceSession childrenSession = this.sessionManager.getSession(operator.getDeviceId());
                if (null != childrenSession) {
                    this.doSend((DeviceMessage)children, childrenSession);
                    return Mono.just((Object)true);
                }
                return this.doReply(this.createReply(deviceId, message).error(ErrorCode.CLIENT_OFFLINE));
            }).switchIfEmpty(Mono.defer(() -> {
                log.warn("device[{}] not connected,send message fail", (Object)message.getDeviceId());
                return this.doReply(this.createReply(deviceId, message).error(ErrorCode.CLIENT_OFFLINE));
            })).subscribe();
        }
    }

    protected DeviceMessageReply createReply(String deviceId, DeviceMessage message) {
        Object reply = message instanceof RepayableDeviceMessage ? ((RepayableDeviceMessage)message).newReply() : new CommonDeviceMessageReply();
        reply.messageId(message.getMessageId()).deviceId(deviceId);
        return reply;
    }

    protected void doSend(final DeviceMessage message, final DeviceSession session) {
        String deviceId = message.getDeviceId();
        DeviceMessageReply reply = this.createReply(deviceId, message);
        final AtomicBoolean alreadyReply = new AtomicBoolean(false);
        session.getOperator().getProtocol().flatMap(protocolSupport -> protocolSupport.getMessageCodec(session.getTransport())).flatMapMany(codec -> codec.encode((MessageEncodeContext)new ToDeviceMessageContext(){

            public Mono<Boolean> sendToDevice(@Nonnull EncodedMessage message2) {
                return session.send(message2);
            }

            public Mono<Void> disconnect() {
                return Mono.fromRunnable(() -> {
                    session.close();
                    DefaultSendToDeviceMessageHandler.this.sessionManager.unregister(session.getId());
                });
            }

            @Nonnull
            public DeviceSession getSession() {
                return session;
            }

            @Nonnull
            public Message getMessage() {
                return message;
            }

            public DeviceOperator getDevice() {
                return session.getOperator();
            }

            @Nonnull
            public Mono<Void> reply(@Nonnull Publisher<? extends DeviceMessage> replyMessage) {
                alreadyReply.set(true);
                return Flux.from(replyMessage).flatMap(msg -> DefaultSendToDeviceMessageHandler.this.decodedClientMessageHandler.handleMessage(session.getOperator(), (Message)msg)).then();
            }
        })).flatMap(arg_0 -> ((DeviceSession)session).send(arg_0)).reduce((r1, r2) -> r1 != false && r2 != false).flatMap(success -> {
            if (alreadyReply.get()) {
                return Mono.empty();
            }
            if (message.getHeader(Headers.async).orElse(false).booleanValue()) {
                return this.doReply(reply.message(ErrorCode.REQUEST_HANDLING.getText()).code(ErrorCode.REQUEST_HANDLING.name()).success());
            }
            return Mono.just((Object)true);
        }).switchIfEmpty(Mono.defer(() -> {
            if (message instanceof DisconnectDeviceMessage) {
                session.close();
                this.sessionManager.unregister(session.getId());
                return alreadyReply.get() ? Mono.empty() : this.doReply(this.createReply(deviceId, message).success());
            }
            return alreadyReply.get() ? Mono.empty() : this.doReply(this.createReply(deviceId, message).error(ErrorCode.UNSUPPORTED_MESSAGE));
        })).onErrorResume(error -> {
            alreadyReply.set(true);
            if (error instanceof DeviceOperationException) {
                DeviceOperationException err = (DeviceOperationException)error;
                return this.doReply(reply.error(err.getCode())).onErrorContinue((e, res) -> log.error(e.getMessage(), e));
            }
            log.error(error.getMessage(), error);
            return this.doReply(reply.error(error)).onErrorContinue((e, res) -> log.error(e.getMessage(), e));
        }).subscribe();
    }

    private Mono<Boolean> doReply(DeviceMessageReply reply) {
        return ((Mono)this.handler.reply(reply).as(mo -> {
            if (log.isDebugEnabled()) {
                return mo.doFinally(s -> log.debug("reply message {} ,[{}]", s, (Object)reply));
            }
            return mo;
        })).doOnError(error -> log.error("reply message error", error));
    }

    @ConstructorProperties(value={"serverId", "sessionManager", "handler", "registry", "decodedClientMessageHandler"})
    public DefaultSendToDeviceMessageHandler(String serverId, DeviceSessionManager sessionManager, MessageHandler handler, DeviceRegistry registry, DecodedClientMessageHandler decodedClientMessageHandler) {
        this.serverId = serverId;
        this.sessionManager = sessionManager;
        this.handler = handler;
        this.registry = registry;
        this.decodedClientMessageHandler = decodedClientMessageHandler;
    }
}

