/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.rule.engine.cluster.scheduler.RemoteScheduler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class ClusterSchedulerRegistry
implements SchedulerRegistry {
    private static final Logger log = LoggerFactory.getLogger(ClusterSchedulerRegistry.class);
    private final Set<Scheduler> localSchedulers = new ConcurrentSkipListSet<Scheduler>(Comparator.comparing(Scheduler::getId));
    private final Map<String, RemoteScheduler> remoteSchedulers = new ConcurrentHashMap<String, RemoteScheduler>();
    private final EmitterProcessor<Scheduler> joinProcessor = EmitterProcessor.create((int)Integer.MAX_VALUE, (boolean)false);
    private final EmitterProcessor<Scheduler> leaveProcessor = EmitterProcessor.create((int)Integer.MAX_VALUE, (boolean)false);
    private final FluxSink<Scheduler> joinSink = this.joinProcessor.sink();
    private final FluxSink<Scheduler> leaveSink = this.leaveProcessor.sink();
    private final List<Disposable> disposables = new CopyOnWriteArrayList<Disposable>();
    private final EventBus eventBus;
    private final RpcServiceFactory serviceFactory;
    private Duration keepaliveInterval = Duration.ofSeconds(10L);

    public ClusterSchedulerRegistry(EventBus eventBus, RpcServiceFactory serviceFactory) {
        this.eventBus = eventBus;
        this.serviceFactory = serviceFactory;
    }

    public void setup() {
        if (!this.disposables.isEmpty()) {
            return;
        }
        this.joinProcessor.subscribe(scheduler -> {
            RemoteScheduler old = this.remoteSchedulers.put(scheduler.getId(), (RemoteScheduler)scheduler);
            if (old != null) {
                old.dispose();
            }
            log.debug("remote scheduler join:{}", (Object)scheduler.getId());
        });
        this.leaveProcessor.subscribe(scheduler -> {
            log.debug("remote scheduler leave:{}", (Object)scheduler.getId());
            Scheduler old = this.remoteSchedulers.remove(scheduler.getId());
            if (old != null && old != scheduler) {
                old.dispose();
            }
            scheduler.dispose();
        });
        this.disposables.add(this.eventBus.subscribe(Subscription.of((String)"rule-engine.register", (String)"/rule-engine/cluster-scheduler/keepalive", (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.broker}), String.class).filter(id -> !this.remoteSchedulers.containsKey(id)).doOnNext(id -> {
            RemoteScheduler scheduler = new RemoteScheduler((String)id, this.serviceFactory);
            scheduler.init();
            this.joinSink.next((Object)scheduler);
            this.publishLocal().subscribe();
        }).subscribe());
        this.disposables.add(this.eventBus.subscribe(Subscription.of((String)"rule-engine.register", (String)"/rule-engine/cluster-scheduler/leave", (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.broker}), String.class).filter(this.remoteSchedulers::containsKey).flatMap(id -> Mono.justOrEmpty((Object)this.remoteSchedulers.remove(id))).doOnNext(arg_0 -> this.leaveSink.next(arg_0)).subscribe());
        this.disposables.add(Flux.interval((Duration)this.keepaliveInterval).subscribe(ignore -> this.publishLocal().then(Flux.fromIterable(this.remoteSchedulers.values()).filterWhen(scheduler -> scheduler.isNoAlive().onErrorResume(err -> Mono.just((Object)true))).doOnNext(arg_0 -> this.leaveSink.next(arg_0)).then()).subscribe()));
        this.publishLocal().block();
    }

    private Mono<Void> publishLocal() {
        return this.eventBus.publish("/rule-engine/cluster-scheduler/keepalive", (Publisher)Flux.fromIterable(this.localSchedulers).map(Scheduler::getId)).then();
    }

    public void cleanup() {
        this.eventBus.publish("/rule-engine/cluster-scheduler/leave", (Publisher)Flux.fromIterable(this.localSchedulers).map(Scheduler::getId)).subscribe();
        this.remoteSchedulers.values().forEach(Disposable::dispose);
        this.disposables.forEach(Disposable::dispose);
        this.disposables.clear();
        this.remoteSchedulers.clear();
    }

    @Override
    public Flux<Scheduler> getSchedulers() {
        return Flux.just((Object[])new Collection[]{this.localSchedulers, this.remoteSchedulers.values()}).flatMapIterable(Function.identity());
    }

    @Override
    public Flux<Scheduler> handleSchedulerJoin() {
        return this.joinProcessor;
    }

    @Override
    public Flux<Scheduler> handleSchedulerLeave() {
        return this.leaveProcessor;
    }

    @Override
    public void register(Scheduler scheduler) {
        this.localSchedulers.add(scheduler);
        if (!this.disposables.isEmpty()) {
            this.publishLocal().subscribe();
        }
    }

    @Override
    public List<Scheduler> getLocalSchedulers() {
        return new ArrayList<Scheduler>(this.localSchedulers);
    }

    public void setKeepaliveInterval(Duration keepaliveInterval) {
        this.keepaliveInterval = keepaliveInterval;
    }
}

