/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster.scheduler;

import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.ServiceInfo;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.rule.engine.cluster.scheduler.ClusterRemoteScheduler;
import org.jetlinks.rule.engine.cluster.scheduler.SchedulerRpcService;
import org.jetlinks.rule.engine.cluster.scheduler.SchedulerRpcServiceImpl;
import org.jetlinks.supports.scalecube.EmptyServiceMethodRegistry;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ClusterSchedulerRegistry
implements SchedulerRegistry {
    private static final Logger log = LoggerFactory.getLogger(ClusterSchedulerRegistry.class);
    private static final String REG_QUALIFIER = "rule/scheduler/reg";
    private static final String PULL_QUALIFIER = "rule/scheduler/pull";
    private static final String REG_FROM_HEADER = "from";
    private static final String SERVICE_SCHEDULER_ID_TAG = "schedulerId";
    private final ExtendedCluster cluster;
    private final ServiceCall serviceCall;
    private final Scheduler localScheduler;
    private final Map<String, ClusterRemoteScheduler> remotes = new NonBlockingHashMap();
    private final Sinks.Many<Scheduler> schedulerJoin = Sinks.many().multicast().directBestEffort();
    private final Sinks.Many<Scheduler> schedulerLeave = Sinks.many().multicast().directBestEffort();

    public ClusterSchedulerRegistry(ExtendedCluster cluster, ServiceCall serviceCall, Scheduler localScheduler) {
        this.cluster = cluster;
        this.serviceCall = serviceCall;
        this.localScheduler = localScheduler;
        this.init();
    }

    void init() {
        this.cluster.handler(cluster1 -> new ClusterMessageHandler(){

            public void onGossip(Message gossip) {
                ClusterSchedulerRegistry.this.handleClusterMessage(gossip);
            }

            public void onMessage(Message message) {
                ClusterSchedulerRegistry.this.handleClusterMessage(message);
            }

            public void onMembershipEvent(MembershipEvent event) {
                ClusterRemoteScheduler scheduler;
                if (event.isAdded() || event.isUpdated()) {
                    ClusterSchedulerRegistry.this.pullRemote(event.member()).subscribe();
                }
                if ((event.isRemoved() || event.isLeaving()) && null != (scheduler = (ClusterRemoteScheduler)ClusterSchedulerRegistry.this.remotes.remove(event.member().id()))) {
                    ClusterSchedulerRegistry.this.schedulerLeave.emitNext((Object)scheduler, Reactors.emitFailureHandler());
                }
            }
        });
        Flux.fromIterable((Iterable)this.cluster.otherMembers()).flatMap(this::pullRemote).blockLast();
    }

    private Mono<Void> pullRemote(Member member) {
        return this.cluster.send(member, Message.withQualifier((String)PULL_QUALIFIER).header(REG_FROM_HEADER, this.cluster.member().id()).data((Object)this.localScheduler.getId()).build()).onErrorResume(err -> {
            log.error(err.getMessage(), err);
            return Mono.empty();
        });
    }

    private Mono<Void> pushRemote(Member member) {
        return this.cluster.send(member, Message.withQualifier((String)REG_QUALIFIER).header(REG_FROM_HEADER, this.cluster.member().id()).data((Object)this.localScheduler.getId()).build());
    }

    private void handleClusterMessage(Message message) {
        String from;
        if (Objects.equals(REG_QUALIFIER, message.qualifier()) || Objects.equals(PULL_QUALIFIER, message.qualifier())) {
            from = message.header(REG_FROM_HEADER);
            String schedulerId = (String)message.data();
            if (this.localScheduler.getId().equals(schedulerId)) {
                log.warn("register same local scheduler [{}] from {}", (Object)schedulerId, this.cluster.member(from).orElse(null));
                return;
            }
            if (this.remotes.containsKey(from)) {
                return;
            }
            log.debug("register new scheduler {} from {}", (Object)schedulerId, this.cluster.member(from).orElse(null));
            SchedulerRpcService rpcService = (SchedulerRpcService)this.serviceCall.router((serviceRegistry, request) -> serviceRegistry.lookupService(request).stream().filter(ref -> Objects.equals(schedulerId, ref.tags().get(SERVICE_SCHEDULER_ID_TAG))).findFirst()).methodRegistry(EmptyServiceMethodRegistry.INSTANCE).api(SchedulerRpcService.class);
            ClusterRemoteScheduler scheduler = new ClusterRemoteScheduler(schedulerId, rpcService);
            ClusterRemoteScheduler old = this.remotes.put(from, scheduler);
            if (old != null) {
                this.schedulerJoin.emitNext((Object)scheduler, Reactors.emitFailureHandler());
            }
        }
        if (Objects.equals(PULL_QUALIFIER, message.qualifier())) {
            from = message.header(REG_FROM_HEADER);
            this.cluster.member(from).ifPresent(member -> this.pushRemote((Member)member).subscribe());
        }
    }

    @Override
    public List<Scheduler> getLocalSchedulers() {
        return Collections.singletonList(this.localScheduler);
    }

    @Override
    public Flux<Scheduler> getSchedulers() {
        return Flux.concat((Publisher[])new Publisher[]{Flux.fromIterable(this.getLocalSchedulers()), Flux.fromIterable(this.remotes.values())});
    }

    @Override
    public Flux<Scheduler> handleSchedulerJoin() {
        return this.schedulerJoin.asFlux().onBackpressureBuffer();
    }

    @Override
    public Flux<Scheduler> handleSchedulerLeave() {
        return this.schedulerLeave.asFlux().onBackpressureBuffer();
    }

    @Override
    public void register(Scheduler scheduler) {
        throw new UnsupportedOperationException();
    }

    public static ServiceInfo createService(Scheduler scheduler) {
        SchedulerRpcServiceImpl impl = new SchedulerRpcServiceImpl(scheduler);
        return ServiceInfo.fromServiceInstance((Object)impl).tag(SERVICE_SCHEDULER_ID_TAG, scheduler.getId()).build();
    }
}

