/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.jetlinks.core.cache.Caches;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.ReplyFailureHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public abstract class AbstractDeviceOperationBroker
implements DeviceOperationBroker,
MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(AbstractDeviceOperationBroker.class);
    private final Map<String, Sinks.Many<DeviceMessageReply>> replyProcessor = Caches.newCache();
    private final Map<String, AtomicInteger> fragmentCounter = new ConcurrentHashMap<String, AtomicInteger>();
    private ReplyFailureHandler replyFailureHandler = (error, message) -> log.warn("unhandled reply message:{}", (Object)message, (Object)error);

    public abstract Flux<DeviceStateInfo> getDeviceState(String var1, Collection<String> var2);

    public abstract Disposable handleGetDeviceState(String var1, Function<Publisher<String>, Flux<DeviceStateInfo>> var2);

    public Flux<DeviceMessageReply> handleReply(String deviceId, String messageId, Duration timeout) {
        long startWith = System.currentTimeMillis();
        String id = this.getAwaitReplyKey(deviceId, messageId);
        return ((Flux)this.replyProcessor.computeIfAbsent(id, ignore -> Sinks.many().multicast().onBackpressureBuffer()).asFlux().as(flux -> {
            if (timeout.isZero() || timeout.isNegative()) {
                return flux;
            }
            return flux.timeout(timeout, (Publisher)Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT)));
        })).doFinally(signal -> {
            log.trace("reply device message {} {} take {}ms", new Object[]{deviceId, messageId, System.currentTimeMillis() - startWith});
            this.replyProcessor.remove(id);
            this.fragmentCounter.remove(id);
        });
    }

    public abstract Mono<Integer> send(String var1, Publisher<? extends Message> var2);

    public abstract Mono<Integer> send(Publisher<? extends BroadcastMessage> var1);

    public abstract Flux<Message> handleSendToDeviceMessage(String var1);

    protected abstract Mono<Void> doReply(DeviceMessageReply var1);

    protected String getAwaitReplyKey(DeviceMessage message) {
        return this.getAwaitReplyKey(message.getDeviceId(), message.getMessageId());
    }

    protected String getAwaitReplyKey(String deviceId, String messageId) {
        return deviceId + ":" + messageId;
    }

    public Mono<Boolean> reply(DeviceMessageReply message) {
        Message childDeviceMessage;
        if (StringUtils.isEmpty((Object)message.getMessageId())) {
            log.warn("reply message messageId is empty: {}", (Object)message);
            return Reactors.ALWAYS_FALSE;
        }
        Mono<Boolean> then = Reactors.ALWAYS_TRUE;
        if (message instanceof ChildDeviceMessageReply && (childDeviceMessage = ((ChildDeviceMessageReply)message).getChildDeviceMessage()) instanceof DeviceMessageReply) {
            then = this.reply((DeviceMessageReply)childDeviceMessage);
        }
        return Mono.defer(() -> {
            String msgId = message.getHeader(Headers.fragmentBodyMessageId).orElse(message.getMessageId());
            if (message.getHeader(Headers.async).orElse(false).booleanValue() || this.replyProcessor.containsKey(this.getAwaitReplyKey(message.getDeviceId(), msgId))) {
                this.handleReply(message);
                return Reactors.ALWAYS_TRUE;
            }
            return this.doReply(message).thenReturn((Object)true);
        }).then((Mono)then);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleReply(DeviceMessageReply message) {
        try {
            String messageId = this.getAwaitReplyKey((DeviceMessage)message);
            String partMsgId = message.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (partMsgId != null) {
                log.trace("handle fragment device[{}] message {}", (Object)message.getDeviceId(), (Object)message);
                partMsgId = this.getAwaitReplyKey(message.getDeviceId(), partMsgId);
                Sinks.Many<DeviceMessageReply> processor = this.replyProcessor.getOrDefault(partMsgId, this.replyProcessor.get(messageId));
                if (processor == null || processor.currentSubscriberCount() == 0) {
                    this.replyProcessor.remove(partMsgId);
                    return;
                }
                int partTotal = message.getHeader(Headers.fragmentNumber).orElse(1);
                AtomicInteger counter = this.fragmentCounter.computeIfAbsent(partMsgId, r -> new AtomicInteger(partTotal));
                try {
                    processor.emitNext((Object)message, Reactors.emitFailureHandler());
                }
                finally {
                    if (counter.decrementAndGet() <= 0 || message.getHeader(Headers.fragmentLast).orElse(false).booleanValue()) {
                        try {
                            processor.tryEmitComplete();
                        }
                        finally {
                            this.replyProcessor.remove(partMsgId);
                            this.fragmentCounter.remove(partMsgId);
                        }
                    }
                }
                return;
            }
            Sinks.Many<DeviceMessageReply> processor = this.replyProcessor.get(messageId);
            if (processor != null) {
                processor.emitNext((Object)message, Reactors.emitFailureHandler());
                processor.emitComplete(Reactors.emitFailureHandler());
            } else {
                this.replyProcessor.remove(messageId);
            }
        }
        catch (Throwable e) {
            this.replyFailureHandler.handle(e, message);
        }
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }
}

