/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.server;

import java.util.function.Function;
import javax.annotation.Nonnull;
import lombok.Generated;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.DeviceOfflineMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

@Deprecated
public class DefaultDecodedClientMessageHandler
implements DecodedClientMessageHandler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultDecodedClientMessageHandler.class);
    private final MessageHandler messageHandler;
    private final FluxProcessor<Message, Message> processor;
    private final FluxSink<Message> sink;
    private final DeviceSessionManager sessionManager;

    public DefaultDecodedClientMessageHandler(MessageHandler handler, DeviceSessionManager sessionManager) {
        this(handler, sessionManager, (FluxProcessor<Message, Message>)EmitterProcessor.create((boolean)false));
    }

    public DefaultDecodedClientMessageHandler(MessageHandler handler, DeviceSessionManager sessionManager, FluxProcessor<Message, Message> processor) {
        this.messageHandler = handler;
        this.processor = processor;
        this.sessionManager = sessionManager;
        this.sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    protected Mono<Boolean> handleChildrenDeviceMessage(DeviceOperator device, String childrenId, Message message) {
        if (message instanceof DeviceMessageReply) {
            return this.doReply((DeviceMessageReply)message);
        }
        if (message instanceof DeviceOnlineMessage) {
            return this.sessionManager.registerChildren(device.getDeviceId(), childrenId).thenReturn((Object)true).defaultIfEmpty((Object)false);
        }
        if (message instanceof DeviceOfflineMessage) {
            return this.sessionManager.unRegisterChildren(device.getDeviceId(), childrenId).thenReturn((Object)true).defaultIfEmpty((Object)false);
        }
        return Mono.just((Object)true);
    }

    protected Mono<Boolean> handleChildrenDeviceMessageReply(DeviceOperator session, ChildDeviceMessage reply) {
        return this.handleChildrenDeviceMessage(session, reply.getChildDeviceId(), reply.getChildDeviceMessage());
    }

    protected Mono<Boolean> handleChildrenDeviceMessageReply(DeviceOperator session, ChildDeviceMessageReply reply) {
        return this.handleChildrenDeviceMessage(session, reply.getChildDeviceId(), reply.getChildDeviceMessage());
    }

    public void shutdown() {
    }

    public Flux<Message> subscribe() {
        return this.processor.map(Function.identity()).doOnError(err -> log.error(err.getMessage(), err));
    }

    @Override
    public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
        return Mono.defer(() -> {
            if (device != null) {
                if (message instanceof ChildDeviceMessageReply) {
                    return this.handleChildrenDeviceMessageReply(device, (ChildDeviceMessageReply)message);
                }
                if (message instanceof ChildDeviceMessage) {
                    return this.handleChildrenDeviceMessageReply(device, (ChildDeviceMessage)message);
                }
            }
            if (message instanceof DeviceMessageReply) {
                return this.doReply((DeviceMessageReply)message);
            }
            return Mono.just((Object)true);
        }).defaultIfEmpty((Object)false).doFinally(s -> {
            if (this.processor.hasDownstreams()) {
                this.sink.next((Object)message);
            }
        }).onErrorContinue((err, res) -> log.error("handle device[{}] message [{}] error", new Object[]{device.getDeviceId(), message, err}));
    }

    private Mono<Boolean> doReply(DeviceMessageReply reply) {
        if (log.isDebugEnabled()) {
            log.debug("reply message {}", (Object)reply.getMessageId());
        }
        return this.messageHandler.reply(reply).doOnSuccess(success -> {
            if (log.isDebugEnabled()) {
                log.debug("reply message {} complete", (Object)reply.getMessageId());
            }
        }).thenReturn((Object)true).doOnError(error -> log.error("reply message error", error));
    }
}

