/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.jetlinks.core.codec.Codec;
import org.jetlinks.core.codec.Codecs;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.supports.cluster.AbstractDeviceOperationBroker;
import org.jetlinks.supports.cluster.redis.DeviceCheckRequest;
import org.jetlinks.supports.cluster.redis.DeviceCheckResponse;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;

public class EventBusDeviceOperationBroker
extends AbstractDeviceOperationBroker
implements Disposable {
    private static final Logger log = LoggerFactory.getLogger(EventBusDeviceOperationBroker.class);
    private static final Codec<Message> messageCodec = Codecs.lookup(Message.class);
    private final String serverId;
    private final EventBus eventBus;
    private final Disposable.Composite disposable = Disposables.composite();
    private final Map<String, FluxProcessor<DeviceCheckResponse, DeviceCheckResponse>> checkRequests = new ConcurrentHashMap<String, FluxProcessor<DeviceCheckResponse, DeviceCheckResponse>>();
    private Function<Publisher<String>, Flux<DeviceStateInfo>> localStateChecker;
    private final Map<String, RepayableDeviceMessage<?>> awaits = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(5L)).removalListener(notify -> {
        if (notify.getCause() == RemovalCause.EXPIRED) {
            try {
                log.debug("discard await reply message[{}] message,{}", notify.getKey(), notify.getValue());
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }).build().asMap();

    public EventBusDeviceOperationBroker(String serverId, EventBus eventBus) {
        this.serverId = serverId;
        this.eventBus = eventBus;
    }

    public void dispose() {
        this.disposable.dispose();
    }

    public boolean isDisposed() {
        return this.disposable.isDisposed();
    }

    private void doSubscribeReply() {
        Subscription subscription = Subscription.of((String)"device-message-broker", (String[])new String[]{"/_sys/msg-broker-reply/" + this.serverId}, (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.broker});
        this.disposable.add(this.eventBus.subscribe(subscription, messageCodec).filter(DeviceMessageReply.class::isInstance).cast(DeviceMessageReply.class).subscribe(this::handleReply));
    }

    public void start() {
        this.doSubscribeReply();
        Subscription subscription = Subscription.of((String)"device-state-checker", (String[])new String[]{"/_sys/device-state-check-res/" + this.serverId}, (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.broker});
        this.disposable.add(this.eventBus.subscribe(subscription, DeviceCheckResponse.class).subscribe(response -> Optional.ofNullable(this.checkRequests.remove(response.getRequestId())).ifPresent(processor -> {
            processor.onNext(response);
            processor.onComplete();
        })));
    }

    @Override
    public Flux<DeviceStateInfo> getDeviceState(String deviceGatewayServerId, Collection<String> deviceIdList) {
        return Flux.defer(() -> {
            if (this.serverId.equals(deviceGatewayServerId) && this.localStateChecker != null) {
                return (Publisher)this.localStateChecker.apply((Publisher<String>)Flux.fromIterable((Iterable)deviceIdList));
            }
            long startWith = System.currentTimeMillis();
            String uid = UUID.randomUUID().toString();
            DeviceCheckRequest request = new DeviceCheckRequest(this.serverId, uid, new ArrayList<String>(deviceIdList));
            EmitterProcessor processor = EmitterProcessor.create((boolean)true);
            this.checkRequests.put(uid, (FluxProcessor<DeviceCheckResponse, DeviceCheckResponse>)processor);
            return this.eventBus.publish("/_sys/device-state-check/".concat(deviceGatewayServerId), (Object)request).thenMany((Publisher)processor.flatMap(deviceCheckResponse -> Flux.fromIterable(deviceCheckResponse.getStateInfoList()))).timeout(Duration.ofSeconds(5L), (Publisher)Flux.empty()).doFinally(s -> {
                log.trace("check device state complete take {}ms", (Object)(System.currentTimeMillis() - startWith));
                this.checkRequests.remove(uid);
            });
        });
    }

    @Override
    public Disposable handleGetDeviceState(String serverId, Function<Publisher<String>, Flux<DeviceStateInfo>> stateMapper) {
        Subscription subscription = Subscription.of((String)"device-state-checker", (String[])new String[]{"/_sys/device-state-check/" + serverId}, (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.broker});
        this.localStateChecker = stateMapper;
        return this.eventBus.subscribe(subscription, DeviceCheckRequest.class).subscribe(request -> ((Flux)stateMapper.apply((Publisher<String>)Flux.fromIterable(request.getDeviceId()))).collectList().map(resp -> new DeviceCheckResponse((List<DeviceStateInfo>)resp, request.getRequestId())).flatMap(res -> this.eventBus.publish("/_sys/device-state-check-res/" + request.getFrom(), res)).subscribe());
    }

    @Override
    protected Mono<Void> doReply(DeviceMessageReply reply) {
        String serverId = Optional.ofNullable(this.awaits.remove(this.getAwaitReplyKey((DeviceMessage)reply))).flatMap(req -> req.getHeader(Headers.sendFrom)).orElse("*");
        return this.eventBus.publish("/_sys/msg-broker-reply/" + serverId, messageCodec, (Object)reply).doOnNext(i -> {
            if (i <= 0L && !"*".equals(serverId)) {
                log.warn("no handler [{}] for reply message : {}", (Object)serverId, (Object)reply);
            }
        }).then();
    }

    @Override
    public Mono<Integer> send(String deviceGatewayServerId, Publisher<? extends Message> message) {
        return this.eventBus.publish("/_sys/msg-broker/" + deviceGatewayServerId, messageCodec, (Publisher)Flux.from(message).doOnNext(msg -> msg.addHeader(Headers.sendFrom, (Object)this.serverId))).map(Long::intValue);
    }

    @Override
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> message) {
        return this.eventBus.publish("/_sys/msg-broker-broadcast", messageCodec, message).map(Long::intValue);
    }

    @Override
    public Flux<Message> handleSendToDeviceMessage(String serverId) {
        Subscription subscription = Subscription.of((String)"device-message-broker", (String[])new String[]{"/_sys/msg-broker/" + serverId}, (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker});
        return this.eventBus.subscribe(subscription, messageCodec).doOnNext(message -> {
            boolean isSameServer;
            if (message instanceof RepayableDeviceMessage && !message.getHeader(Headers.sendAndForget).orElse(false).booleanValue() && !(isSameServer = message.getHeader(Headers.sendFrom).map(sendFrom -> sendFrom.equals(serverId)).orElse(false).booleanValue())) {
                this.awaits.put(this.getAwaitReplyKey((DeviceMessage)((RepayableDeviceMessage)message)), (RepayableDeviceMessage)message);
            }
        });
    }
}

