/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.rule.engine.cluster.scheduler.RemoteScheduler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class ClusterSchedulerRegistry
implements SchedulerRegistry {
    private static final Logger log = LoggerFactory.getLogger(ClusterSchedulerRegistry.class);
    private final Set<Scheduler> localSchedulers = new ConcurrentSkipListSet<Scheduler>(Comparator.comparing(Scheduler::getId));
    private final Set<RemoteScheduler> remoteSchedulers = new ConcurrentSkipListSet<RemoteScheduler>(Comparator.comparing(Scheduler::getId));
    private final EmitterProcessor<Scheduler> joinProcessor = EmitterProcessor.create((boolean)false);
    private final EmitterProcessor<Scheduler> leaveProcessor = EmitterProcessor.create((boolean)false);
    private final FluxSink<Scheduler> joinSink = this.joinProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    private final FluxSink<Scheduler> leaveSink = this.leaveProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    private final List<Disposable> disposables = new CopyOnWriteArrayList<Disposable>();
    private final EventBus eventBus;
    private final RpcServiceFactory serviceFactory;
    private Duration keepaliveInterval = Duration.ofSeconds(10L);

    public ClusterSchedulerRegistry(EventBus eventBus, RpcServiceFactory serviceFactory) {
        this.eventBus = eventBus;
        this.serviceFactory = serviceFactory;
    }

    public void setup() {
        if (!this.disposables.isEmpty()) {
            return;
        }
        this.joinProcessor.subscribe(scheduler -> log.debug("remote scheduler join:{}", (Object)scheduler.getId()));
        this.leaveProcessor.subscribe(scheduler -> log.debug("remote scheduler leaved:{}", (Object)scheduler.getId()));
        this.disposables.add(this.eventBus.subscribe(Subscription.of((String)"rule-engine.register", (String)"/rule-engine/cluster-scheduler/keepalive", (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.broker}), String.class).map(id -> new RemoteScheduler((String)id, this.serviceFactory)).filter(scheduler -> !this.localSchedulers.contains(scheduler) && !this.remoteSchedulers.contains(scheduler)).doOnNext(remoteScheduler -> {
            remoteScheduler.init();
            this.joinSink.next(remoteScheduler);
            this.publishLocal().subscribe();
        }).subscribe(this.remoteSchedulers::add, error -> log.error(error.getMessage(), error)));
        this.disposables.add(this.eventBus.subscribe(Subscription.of((String)"rule-engine.register", (String)"/rule-engine/cluster-scheduler/leave", (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.broker}), String.class).map(id -> new RemoteScheduler((String)id, this.serviceFactory)).filter(scheduler -> !this.localSchedulers.contains(scheduler)).doOnNext(arg_0 -> this.leaveSink.next(arg_0)).subscribe(this.remoteSchedulers::remove));
        this.disposables.add(Flux.interval((Duration)this.keepaliveInterval).subscribe(ignore -> this.publishLocal().then(Flux.fromIterable(this.remoteSchedulers).filterWhen(scheduler -> scheduler.isNoAlive().onErrorResume(err -> Mono.just((Object)true))).doOnNext(scheduler -> {
            scheduler.dispose();
            this.remoteSchedulers.remove(scheduler);
            this.leaveSink.next(scheduler);
        }).then()).subscribe()));
        this.publishLocal().block();
    }

    private Mono<Void> publishLocal() {
        return this.eventBus.publish("/rule-engine/cluster-scheduler/keepalive", (Publisher)Flux.fromIterable(this.localSchedulers).map(Scheduler::getId)).then();
    }

    public void cleanup() {
        this.eventBus.publish("/rule-engine/cluster-scheduler/leave", (Publisher)Flux.fromIterable(this.localSchedulers).map(Scheduler::getId)).subscribe();
        this.remoteSchedulers.forEach(Disposable::dispose);
        this.disposables.forEach(Disposable::dispose);
        this.disposables.clear();
    }

    @Override
    public Flux<Scheduler> getSchedulers() {
        return Flux.just((Object[])new Set[]{this.localSchedulers, this.remoteSchedulers}).flatMapIterable(Function.identity());
    }

    @Override
    public Flux<Scheduler> handleSchedulerJoin() {
        return this.joinProcessor;
    }

    @Override
    public Flux<Scheduler> handleSchedulerLeave() {
        return this.leaveProcessor;
    }

    @Override
    public void register(Scheduler scheduler) {
        this.localSchedulers.add(scheduler);
        if (!this.disposables.isEmpty()) {
            this.publishLocal().subscribe();
        }
    }

    @Override
    public List<Scheduler> getLocalSchedulers() {
        return new ArrayList<Scheduler>(this.localSchedulers);
    }

    public void setKeepaliveInterval(Duration keepaliveInterval) {
        this.keepaliveInterval = keepaliveInterval;
    }
}

