/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster;

import com.google.common.cache.CacheBuilder;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.ReplyFailureHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

public abstract class AbstractDeviceOperationBroker
implements DeviceOperationBroker,
MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(AbstractDeviceOperationBroker.class);
    protected final Map<AwaitKey, Awaiting> replyProcessor = new ConcurrentHashMap<AwaitKey, Awaiting>();
    private final Map<AwaitKey, AtomicInteger> fragmentCounter = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(5L)).build().asMap();
    private final NavigableMap<PendingKey, String> pendingNoMessageId = new ConcurrentSkipListMap<PendingKey, String>();
    private ReplyFailureHandler replyFailureHandler = (error, message) -> log.info("unhandled reply message:{}", (Object)message, (Object)error);

    public abstract Flux<DeviceStateInfo> getDeviceState(String var1, Collection<String> var2);

    public abstract Disposable handleGetDeviceState(String var1, Function<Publisher<String>, Flux<DeviceStateInfo>> var2);

    public Flux<DeviceMessageReply> handleReply(DeviceMessage message, Duration timeout) {
        if (message instanceof RepayableDeviceMessage && ((Boolean)message.getHeaderOrDefault(Headers.replyNoMessageId)).booleanValue()) {
            MessageType replyType = ((RepayableDeviceMessage)message).getReplyType();
            String messageId = message.getMessageId();
            PendingKey key = new PendingKey(message.getDeviceId(), message.getTimestamp(), replyType);
            this.pendingNoMessageId.put(key, message.getMessageId());
            return this.handleReply0(message.getDeviceId(), message.getMessageId(), timeout, () -> this.pendingNoMessageId.remove(key, messageId));
        }
        return this.handleReply(message.getDeviceId(), message.getMessageId(), timeout);
    }

    protected void checkExpires() {
        this.replyProcessor.values().forEach(Awaiting::checkExpires);
    }

    public Flux<DeviceMessageReply> handleReply0(String deviceId, String messageId, Duration timeout, Runnable after) {
        Awaiting awaiting;
        long startWith = System.currentTimeMillis();
        AwaitKey key = this.getAwaitReplyKey(deviceId, messageId);
        while (this.replyProcessor.put(key, awaiting = new Awaiting(startWith, key, after)) != null && (awaiting = this.replyProcessor.get(key)) == null) {
        }
        Flux reply = awaiting.asFlux();
        if (!timeout.isZero() && !timeout.isNegative()) {
            reply = reply.timeout(timeout, (Publisher)Mono.error(() -> new DeviceOperationException.NoStackTrace(ErrorCode.TIME_OUT)));
        }
        return reply.doFinally((Consumer)awaiting);
    }

    public Flux<DeviceMessageReply> handleReply(String deviceId, String messageId, Duration timeout) {
        return this.handleReply0(deviceId, messageId, timeout, () -> {});
    }

    public abstract Mono<Integer> send(String var1, Publisher<? extends Message> var2);

    public abstract Mono<Integer> send(Publisher<? extends BroadcastMessage> var1);

    public abstract Flux<Message> handleSendToDeviceMessage(String var1);

    protected abstract Mono<Void> doReply(DeviceMessageReply var1);

    protected AwaitKey getAwaitReplyKey(DeviceMessage message) {
        return this.getAwaitReplyKey(message.getDeviceId(), message.getMessageId());
    }

    protected AwaitKey getAwaitReplyKey(String deviceId, String messageId) {
        return new AwaitKey(deviceId, messageId);
    }

    protected boolean handleNoMessageIdReply(DeviceMessageReply message) {
        PendingKey to;
        PendingKey from = new PendingKey(message.getDeviceId(), 0L, message.getMessageType());
        Map.Entry<PendingKey, String> entry = this.pendingNoMessageId.subMap(from, false, to = new PendingKey(message.getDeviceId(), message.getTimestamp(), message.getMessageType()), true).firstEntry();
        if (entry != null && Objects.equals(entry.getKey().deviceId, message.getDeviceId()) && Objects.equals(entry.getKey().messageType, message.getMessageType())) {
            message.messageId(entry.getValue());
            this.pendingNoMessageId.remove(entry.getKey(), entry.getValue());
            return true;
        }
        return false;
    }

    public Mono<Boolean> reply(DeviceMessageReply message) {
        Message childDeviceMessage;
        if (!StringUtils.hasText((String)message.getMessageId()) && !this.handleNoMessageIdReply(message)) {
            log.warn("reply message messageId is empty: {}", (Object)message);
            return Reactors.ALWAYS_FALSE;
        }
        Mono<Boolean> then = Reactors.ALWAYS_TRUE;
        if (message instanceof ChildDeviceMessageReply && (childDeviceMessage = ((ChildDeviceMessageReply)message).getChildDeviceMessage()) instanceof DeviceMessageReply) {
            then = this.reply((DeviceMessageReply)childDeviceMessage);
        }
        return Mono.defer(() -> {
            String msgId = message.getHeader(Headers.fragmentBodyMessageId).orElse(message.getMessageId());
            if (message.getHeader(Headers.async).orElse(false).booleanValue() || this.replyProcessor.containsKey(this.getAwaitReplyKey(message.getDeviceId(), msgId))) {
                this.handleReply(message);
                return Reactors.ALWAYS_TRUE;
            }
            return this.doReply(message).thenReturn((Object)true);
        }).then((Mono)then);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleReply(DeviceMessageReply message) {
        try {
            AwaitKey key = this.getAwaitReplyKey((DeviceMessage)message);
            String partMsgId = message.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (partMsgId != null) {
                log.trace("handle fragment device[{}] message {}", (Object)message.getDeviceId(), (Object)message);
                AwaitKey _partMsgId = this.getAwaitReplyKey(message.getDeviceId(), partMsgId);
                Awaiting processor = this.replyProcessor.getOrDefault(_partMsgId, this.replyProcessor.get(key));
                if (processor == null || processor.currentSubscriberCount() == 0) {
                    this.replyProcessor.remove(_partMsgId);
                    return;
                }
                int partTotal = message.getHeader(Headers.fragmentNumber).orElse(1);
                AtomicInteger counter = this.fragmentCounter.computeIfAbsent(_partMsgId, r -> new AtomicInteger(partTotal));
                try {
                    processor.emitNext(message);
                }
                finally {
                    if (counter.decrementAndGet() <= 0 || message.getHeader(Headers.fragmentLast).orElse(false).booleanValue()) {
                        try {
                            processor.tryComplete();
                        }
                        finally {
                            this.replyProcessor.remove(_partMsgId);
                            this.fragmentCounter.remove(_partMsgId);
                        }
                    }
                }
                return;
            }
            Awaiting processor = this.replyProcessor.get(key);
            if (processor != null) {
                processor.emitNext(message);
                processor.complete();
            }
        }
        catch (Throwable e) {
            this.replyFailureHandler.handle(e, message);
        }
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }

    protected static class PendingKey
    implements Comparable<PendingKey> {
        static Comparator<PendingKey> comparator = Comparator.comparing(PendingKey::getDeviceId).thenComparing(PendingKey::getMessageType).thenComparing(PendingKey::getTimestamp);
        private String deviceId;
        private long timestamp;
        private MessageType messageType;

        @Override
        public int compareTo(@Nonnull PendingKey o) {
            return comparator.compare(this, o);
        }

        public PendingKey(String deviceId, long timestamp, MessageType messageType) {
            this.deviceId = deviceId;
            this.timestamp = timestamp;
            this.messageType = messageType;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PendingKey)) {
                return false;
            }
            PendingKey other = (PendingKey)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getTimestamp() != other.getTimestamp()) {
                return false;
            }
            String this$deviceId = this.getDeviceId();
            String other$deviceId = other.getDeviceId();
            if (this$deviceId == null ? other$deviceId != null : !this$deviceId.equals(other$deviceId)) {
                return false;
            }
            MessageType this$messageType = this.getMessageType();
            MessageType other$messageType = other.getMessageType();
            return !(this$messageType == null ? other$messageType != null : !this$messageType.equals(other$messageType));
        }

        protected boolean canEqual(Object other) {
            return other instanceof PendingKey;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $timestamp = this.getTimestamp();
            result = result * 59 + (int)($timestamp >>> 32 ^ $timestamp);
            String $deviceId = this.getDeviceId();
            result = result * 59 + ($deviceId == null ? 43 : $deviceId.hashCode());
            MessageType $messageType = this.getMessageType();
            result = result * 59 + ($messageType == null ? 43 : $messageType.hashCode());
            return result;
        }

        public String getDeviceId() {
            return this.deviceId;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public MessageType getMessageType() {
            return this.messageType;
        }

        public String toString() {
            return "AbstractDeviceOperationBroker.PendingKey(deviceId=" + this.getDeviceId() + ", timestamp=" + this.getTimestamp() + ", messageType=" + this.getMessageType() + ")";
        }
    }

    protected static class AwaitKey {
        private transient int $hashCodeCache;
        private String deviceId;
        private String messageId;

        public AwaitKey(String deviceId, String messageId) {
            this.deviceId = deviceId;
            this.messageId = messageId;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof AwaitKey)) {
                return false;
            }
            AwaitKey other = (AwaitKey)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$deviceId = this.deviceId;
            String other$deviceId = other.deviceId;
            if (this$deviceId == null ? other$deviceId != null : !this$deviceId.equals(other$deviceId)) {
                return false;
            }
            String this$messageId = this.messageId;
            String other$messageId = other.messageId;
            return !(this$messageId == null ? other$messageId != null : !this$messageId.equals(other$messageId));
        }

        protected boolean canEqual(Object other) {
            return other instanceof AwaitKey;
        }

        public int hashCode() {
            if (this.$hashCodeCache != 0) {
                return this.$hashCodeCache;
            }
            int PRIME = 59;
            int result = 1;
            String $deviceId = this.deviceId;
            result = result * 59 + ($deviceId == null ? 43 : $deviceId.hashCode());
            String $messageId = this.messageId;
            if ((result = result * 59 + ($messageId == null ? 43 : $messageId.hashCode())) == 0) {
                result = Integer.MIN_VALUE;
            }
            this.$hashCodeCache = result;
            return result;
        }
    }

    protected class Awaiting
    implements Consumer<SignalType> {
        long timestamp;
        AwaitKey key;
        Runnable callback;
        final Sinks.Many<DeviceMessageReply> processor = Sinks.many().multicast().onBackpressureBuffer(Queues.XS_BUFFER_SIZE);

        boolean isExpires() {
            return this.currentSubscriberCount() == 0 && System.currentTimeMillis() - this.timestamp > 10000L;
        }

        Flux<DeviceMessageReply> asFlux() {
            return this.processor.asFlux();
        }

        int currentSubscriberCount() {
            return this.processor.currentSubscriberCount();
        }

        void emitNext(DeviceMessageReply message) {
            this.processor.emitNext((Object)message, Reactors.emitFailureHandler());
        }

        void tryComplete() {
            this.processor.tryEmitComplete();
        }

        void complete() {
            this.processor.emitComplete(Reactors.emitFailureHandler());
        }

        void checkExpires() {
            if (this.isExpires()) {
                log.info("awaiting device {} message {} reply expires", (Object)this.key.deviceId, (Object)this.key.messageId);
                this.doFinally();
            }
        }

        void doFinally() {
            if (null != this.callback) {
                this.callback.run();
            }
            AbstractDeviceOperationBroker.this.replyProcessor.remove(this.key, this);
            AbstractDeviceOperationBroker.this.fragmentCounter.remove(this.key);
        }

        @Override
        public void accept(SignalType signalType) {
            if (log.isTraceEnabled()) {
                log.trace("device message {} {} take {}ms", new Object[]{this.key.deviceId, this.key.messageId, System.currentTimeMillis() - this.timestamp});
            }
            this.doFinally();
        }

        public Awaiting(long timestamp, AwaitKey key, Runnable callback) {
            this.timestamp = timestamp;
            this.key = key;
            this.callback = callback;
        }
    }
}

