/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.Generated;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.rule.engine.cluster.scheduler.ClusterRemoteScheduler;
import org.jetlinks.rule.engine.cluster.scheduler.SchedulerRpcService;
import org.jetlinks.rule.engine.cluster.scheduler.SchedulerRpcServiceImpl;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ClusterRpcSchedulerRegistry
implements SchedulerRegistry {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ClusterRpcSchedulerRegistry.class);
    static final String NAMESPACE_SPLIT = "@";
    private final RpcManager rpcManager;
    private final Map<String, Scheduler> locals = new NonBlockingHashMap();
    private final Map<String, Scheduler> remotes = new NonBlockingHashMap();
    private final Sinks.Many<Scheduler> joinListener = Reactors.createMany();
    private final Sinks.Many<Scheduler> leaveListener = Reactors.createMany();
    private final String namespace;

    public ClusterRpcSchedulerRegistry(RpcManager rpcManager) {
        this(null, rpcManager);
    }

    public ClusterRpcSchedulerRegistry(String namespace, RpcManager rpcManager) {
        this.rpcManager = rpcManager;
        this.namespace = namespace;
        this.init();
    }

    void init() {
        this.rpcManager.getServices(SchedulerRpcService.class).subscribe(rpc -> this.registerService(rpc.id(), (SchedulerRpcService)rpc.service()));
        this.rpcManager.listen(SchedulerRpcService.class).flatMap(event -> this.handleEvent((ServiceEvent)event).onErrorResume(err -> Mono.empty())).subscribe(e -> {});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Scheduler registerService(String id, SchedulerRpcService service) {
        String[] arr = id.split(NAMESPACE_SPLIT);
        ClusterRemoteScheduler scheduler = null;
        try {
            if (this.namespace == null && arr.length == 1) {
                scheduler = new ClusterRemoteScheduler(id, service);
                if (this.remotes.put(id, scheduler) == null) {
                    ClusterRemoteScheduler clusterRemoteScheduler = scheduler;
                    return clusterRemoteScheduler;
                }
                Scheduler scheduler2 = null;
                return scheduler2;
            }
            if (this.namespace != null && arr.length > 1 && this.namespace.equals(arr[1]) && this.remotes.put(arr[0], scheduler = new ClusterRemoteScheduler(arr[0], service)) == null) {
                ClusterRemoteScheduler clusterRemoteScheduler = scheduler;
                return clusterRemoteScheduler;
            }
            Scheduler scheduler3 = null;
            return scheduler3;
        }
        finally {
            if (scheduler != null) {
                log.info("rule scheduler {} joined", (Object)scheduler.getId());
            }
        }
    }

    private Mono<Void> handleEvent(ServiceEvent event) {
        if (event.getType() == ServiceEvent.Type.added) {
            return this.rpcManager.getService(event.getServerNodeId(), event.getServiceId(), SchedulerRpcService.class).doOnNext(rpc -> {
                Scheduler scheduler = this.registerService(event.getServiceId(), (SchedulerRpcService)rpc);
                if (scheduler != null && this.joinListener.currentSubscriberCount() > 0) {
                    this.joinListener.emitNext((Object)scheduler, Reactors.emitFailureHandler());
                }
            }).then();
        }
        if (event.getType() == ServiceEvent.Type.removed) {
            String schedulerId;
            String[] arr = event.getServiceId().split(NAMESPACE_SPLIT);
            if (arr.length == 1) {
                schedulerId = event.getServiceId();
            } else {
                if (!this.namespace.equals(arr[1])) {
                    return Mono.empty();
                }
                schedulerId = arr[0];
            }
            log.info("rule scheduler {} leaved", (Object)schedulerId);
            Scheduler scheduler = this.remotes.remove(schedulerId);
            if (null != scheduler && this.leaveListener.currentSubscriberCount() > 0) {
                this.leaveListener.emitNext((Object)scheduler, Reactors.emitFailureHandler());
            }
        }
        return Mono.empty();
    }

    @Override
    public List<Scheduler> getLocalSchedulers() {
        return new ArrayList<Scheduler>(this.locals.values());
    }

    @Override
    public Flux<Scheduler> getSchedulers() {
        return Flux.concat((Publisher[])new Publisher[]{Flux.fromIterable(this.getLocalSchedulers()), Flux.fromIterable(this.remotes.values())});
    }

    @Override
    public Flux<Scheduler> handleSchedulerJoin() {
        return this.joinListener.asFlux();
    }

    @Override
    public Flux<Scheduler> handleSchedulerLeave() {
        return this.leaveListener.asFlux();
    }

    private String createServiceId(String schedulerId) {
        if (this.namespace == null) {
            return schedulerId;
        }
        return schedulerId + NAMESPACE_SPLIT + this.namespace;
    }

    @Override
    public void register(Scheduler scheduler) {
        if (this.locals.containsKey(scheduler.getId())) {
            throw new IllegalStateException("scheduler " + scheduler.getId() + " already registered");
        }
        this.locals.put(scheduler.getId(), scheduler);
        this.rpcManager.registerService(this.createServiceId(scheduler.getId()), (Object)new SchedulerRpcServiceImpl(scheduler));
    }
}

