/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.scalecube;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.gossip.GossipConfig;
import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscoveryContext;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.jetlinks.supports.scalecube.ExtendedClusterImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public final class ExtendedServiceDiscoveryImpl
implements ServiceDiscovery {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
    private ClusterConfig clusterConfig;
    private ExtendedCluster cluster;
    private ServiceEndpoint endpoint;
    private final Sinks.Many<ServiceDiscoveryEvent> sink = Sinks.many().multicast().directBestEffort();

    public ExtendedServiceDiscoveryImpl() {
        this.clusterConfig = ClusterConfig.defaultLanConfig();
    }

    public ExtendedServiceDiscoveryImpl(ExtendedCluster other, ServiceEndpoint endpoint) {
        this();
        this.cluster = other;
        this.endpoint = endpoint;
    }

    private ExtendedServiceDiscoveryImpl(ExtendedServiceDiscoveryImpl other) {
        this.clusterConfig = other.clusterConfig;
        this.cluster = other.cluster;
        this.endpoint = other.endpoint;
    }

    public ExtendedServiceDiscoveryImpl options(UnaryOperator<ClusterConfig> opts) {
        ExtendedServiceDiscoveryImpl d = new ExtendedServiceDiscoveryImpl(this);
        d.clusterConfig = (ClusterConfig)opts.apply(this.clusterConfig);
        return d;
    }

    public ExtendedServiceDiscoveryImpl transport(UnaryOperator<TransportConfig> opts) {
        return this.options(cfg -> cfg.transport(opts));
    }

    public ExtendedServiceDiscoveryImpl membership(UnaryOperator<MembershipConfig> opts) {
        return this.options(cfg -> cfg.membership(opts));
    }

    public ExtendedServiceDiscoveryImpl gossip(UnaryOperator<GossipConfig> opts) {
        return this.options(cfg -> cfg.gossip(opts));
    }

    public ExtendedServiceDiscoveryImpl failureDetector(UnaryOperator<FailureDetectorConfig> opts) {
        return this.options(cfg -> cfg.failureDetector(opts));
    }

    private Mono<ExtendedCluster> initCluster() {
        if (this.cluster == null) {
            this.cluster = new ExtendedClusterImpl(this.clusterConfig.metadata((Object)this.endpoint));
            return ((ExtendedClusterImpl)this.cluster).start();
        }
        return Mono.just((Object)this.cluster);
    }

    public Mono<Void> start() {
        return Mono.deferContextual(context -> {
            ServiceDiscoveryContext.Builder discoveryContextBuilder = (ServiceDiscoveryContext.Builder)context.get(ServiceDiscoveryContext.Builder.class);
            return this.initCluster().doOnNext(cluster1 -> cluster1.handler(cluster -> new ClusterMessageHandler(){

                public void onMembershipEvent(MembershipEvent event) {
                    ExtendedServiceDiscoveryImpl.this.onMembershipEvent(event);
                }
            })).flatMap(cluster -> cluster.updateMetadata(this.endpoint).thenReturn(cluster)).doOnSuccess(cluster -> discoveryContextBuilder.address(cluster.address())).then(Mono.fromCallable(() -> JmxMonitorMBean.start(this))).then(this.loadMembers());
        });
    }

    private Mono<Void> loadMembers() {
        return Flux.fromIterable((Iterable)this.cluster.otherMembers()).flatMap(member -> Mono.justOrEmpty((Optional)this.cluster.metadata((Member)member))).doOnNext(metadata -> {
            if (metadata instanceof ServiceEndpoint) {
                this.sink.emitNext((Object)ServiceDiscoveryEvent.newEndpointAdded((ServiceEndpoint)((ServiceEndpoint)metadata)), Reactors.emitFailureHandler());
            }
        }).then();
    }

    public Flux<ServiceDiscoveryEvent> listen() {
        return this.sink.asFlux().onBackpressureBuffer();
    }

    public Mono<Void> shutdown() {
        return Mono.defer(() -> {
            if (this.cluster == null) {
                this.sink.emitComplete(Reactors.emitFailureHandler());
                return Mono.empty();
            }
            this.cluster.shutdown();
            return this.cluster.onShutdown().doFinally(s -> this.sink.emitComplete(Reactors.emitFailureHandler()));
        });
    }

    private void onMembershipEvent(MembershipEvent membershipEvent) {
        LOGGER.debug("onMembershipEvent: {}", (Object)membershipEvent);
        ServiceDiscoveryEvent discoveryEvent = this.toServiceDiscoveryEvent(membershipEvent);
        if (discoveryEvent == null) {
            LOGGER.debug("DiscoveryEvent is null, cannot publish it (corresponding membershipEvent: {})", (Object)membershipEvent);
            return;
        }
        LOGGER.debug("Publish discoveryEvent: {}", (Object)discoveryEvent);
        this.sink.emitNext((Object)discoveryEvent, Reactors.emitFailureHandler());
    }

    private ServiceDiscoveryEvent toServiceDiscoveryEvent(MembershipEvent membershipEvent) {
        ServiceEndpoint serviceEndpoint;
        ServiceDiscoveryEvent discoveryEvent = null;
        if (membershipEvent.isAdded() && membershipEvent.newMetadata() != null) {
            serviceEndpoint = this.decodeMetadata(membershipEvent.newMetadata());
            ServiceDiscoveryEvent serviceDiscoveryEvent = discoveryEvent = serviceEndpoint == null ? null : ServiceDiscoveryEvent.newEndpointAdded((ServiceEndpoint)serviceEndpoint);
        }
        if (membershipEvent.isUpdated() && membershipEvent.newMetadata() != null) {
            discoveryEvent = ServiceDiscoveryEvent.newEndpointAdded((ServiceEndpoint)this.decodeMetadata(membershipEvent.newMetadata()));
        }
        if (membershipEvent.isRemoved() && membershipEvent.oldMetadata() != null) {
            serviceEndpoint = this.decodeMetadata(membershipEvent.oldMetadata());
            ServiceDiscoveryEvent serviceDiscoveryEvent = discoveryEvent = serviceEndpoint == null ? null : ServiceDiscoveryEvent.newEndpointLeaving((ServiceEndpoint)serviceEndpoint);
        }
        if (membershipEvent.isLeaving() && membershipEvent.newMetadata() != null) {
            serviceEndpoint = this.decodeMetadata(membershipEvent.newMetadata());
            discoveryEvent = serviceEndpoint == null ? null : ServiceDiscoveryEvent.newEndpointLeaving((ServiceEndpoint)serviceEndpoint);
        }
        return discoveryEvent;
    }

    private ServiceEndpoint decodeMetadata(ByteBuffer byteBuffer) {
        try {
            return (ServiceEndpoint)this.clusterConfig.metadataCodec().deserialize(byteBuffer.duplicate());
        }
        catch (Exception e) {
            LOGGER.error("Failed to read metadata: " + e);
            throw Exceptions.propagate((Throwable)e);
        }
    }

    public String toString() {
        return new StringJoiner(", ", ExtendedServiceDiscoveryImpl.class.getSimpleName() + "[", "]").add("cluster=" + this.cluster).add("clusterConfig=" + this.clusterConfig).toString();
    }

    public ExtendedServiceDiscoveryImpl updateEndpoint(ServiceEndpoint endpoint) {
        this.endpoint = endpoint;
        return this;
    }

    private static class JmxMonitorMBean
    implements MonitorMBean {
        private static final String OBJECT_NAME_FORMAT = "io.scalecube.services.discovery:name=%s@%s";
        public static final int RECENT_DISCOVERY_EVENTS_SIZE = 128;
        private final ExtendedServiceDiscoveryImpl discovery;
        private final List<ServiceDiscoveryEvent> recentDiscoveryEvents = new CopyOnWriteArrayList<ServiceDiscoveryEvent>();

        private JmxMonitorMBean(ExtendedServiceDiscoveryImpl discovery) {
            this.discovery = discovery;
        }

        private static JmxMonitorMBean start(ExtendedServiceDiscoveryImpl instance) throws Exception {
            MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
            JmxMonitorMBean jmxMBean = new JmxMonitorMBean(instance);
            jmxMBean.init();
            ObjectName objectName = new ObjectName(String.format(OBJECT_NAME_FORMAT, instance.cluster.member().id(), System.nanoTime()));
            StandardMBean standardMBean = new StandardMBean(jmxMBean, MonitorMBean.class);
            mbeanServer.registerMBean(standardMBean, objectName);
            return jmxMBean;
        }

        private void init() {
            this.discovery.listen().subscribe(this::onDiscoveryEvent);
        }

        @Override
        public String getClusterConfig() {
            return String.valueOf(this.discovery.clusterConfig);
        }

        @Override
        public String getRecentDiscoveryEvents() {
            return this.recentDiscoveryEvents.stream().map(ServiceDiscoveryEvent::toString).collect(Collectors.joining(",", "[", "]"));
        }

        private void onDiscoveryEvent(ServiceDiscoveryEvent event) {
            this.recentDiscoveryEvents.add(event);
            if (this.recentDiscoveryEvents.size() > 128) {
                this.recentDiscoveryEvents.remove(0);
            }
        }
    }

    public static interface MonitorMBean {
        public String getClusterConfig();

        public String getRecentDiscoveryEvents();
    }
}

