/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.cluster;

import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
import io.scalecube.cluster.gossip.GossipProtocolImpl;
import io.scalecube.cluster.membership.IdGenerator;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocolImpl;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.metadata.MetadataStoreImpl;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.TransportImpl;
import java.lang.management.ManagementFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public final class ClusterImpl
implements Cluster {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterImpl.class);
    private static final Set<String> SYSTEM_MESSAGES = Collections.unmodifiableSet(Stream.of("sc/fdetector/ping", "sc/fdetector/pingReq", "sc/fdetector/pingAck", "sc/membership/sync", "sc/membership/syncAck", "sc/gossip/req", "sc/metadata/req", "sc/metadata/resp").collect(Collectors.toSet()));
    private static final Set<String> SYSTEM_GOSSIPS = Collections.singleton("sc/membership/gossip");
    private ClusterConfig config;
    private Function<Cluster, ? extends ClusterMessageHandler> handler = cluster -> new ClusterMessageHandler(){};
    private final DirectProcessor<MembershipEvent> membershipEvents = DirectProcessor.create();
    private final FluxSink<MembershipEvent> membershipSink = this.membershipEvents.sink();
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final MonoProcessor<Void> start = MonoProcessor.create();
    private final MonoProcessor<Void> onStart = MonoProcessor.create();
    private final MonoProcessor<Void> shutdown = MonoProcessor.create();
    private final MonoProcessor<Void> onShutdown = MonoProcessor.create();
    private Transport transport;
    private Member localMember;
    private FailureDetectorImpl failureDetector;
    private GossipProtocolImpl gossip;
    private MembershipProtocolImpl membership;
    private MetadataStore metadataStore;
    private Scheduler scheduler;
    private CorrelationIdGenerator cidGenerator;

    public ClusterImpl() {
        this(ClusterConfig.defaultConfig());
    }

    public ClusterImpl(ClusterConfig config) {
        this.config = Objects.requireNonNull(config);
        this.initLifecycle();
    }

    private ClusterImpl(ClusterImpl that) {
        this.config = that.config.clone();
        this.handler = that.handler;
        this.initLifecycle();
    }

    private void initLifecycle() {
        this.start.then(this.doStart()).doOnSuccess(avoid -> this.onStart.onComplete()).doOnError(arg_0 -> this.onStart.onError(arg_0)).subscribe(null, th -> {
            LOGGER.error("Cluster member {} failed on start: ", (Object)this.localMember, th);
            this.shutdown.onComplete();
        });
        this.shutdown.then(this.doShutdown()).doFinally(s -> this.onShutdown.onComplete()).subscribe();
    }

    public ClusterImpl config(UnaryOperator<ClusterConfig> options) {
        Objects.requireNonNull(options);
        ClusterImpl cluster = new ClusterImpl(this);
        cluster.config = (ClusterConfig)options.apply(this.config);
        return cluster;
    }

    public ClusterImpl handler(Function<Cluster, ClusterMessageHandler> handler) {
        Objects.requireNonNull(handler);
        ClusterImpl cluster = new ClusterImpl(this);
        cluster.handler = handler;
        return cluster;
    }

    public Mono<Cluster> start() {
        return Mono.defer(() -> {
            this.start.onComplete();
            return this.onStart.thenReturn((Object)this);
        });
    }

    public Cluster startAwait() {
        return (Cluster)this.start().block();
    }

    private Mono<Cluster> doStart() {
        return Mono.defer(() -> {
            this.validateConfiguration();
            return this.doStart0();
        });
    }

    private Mono<Cluster> doStart0() {
        return TransportImpl.bind((TransportConfig)this.config.transportConfig()).flatMap(transport1 -> {
            this.localMember = this.createLocalMember(transport1.address().port());
            this.transport = new SenderAwareTransport((Transport)transport1, this.localMember.address());
            this.cidGenerator = new CorrelationIdGenerator(this.localMember.id());
            this.scheduler = Schedulers.newSingle((String)("sc-cluster-" + this.localMember.address().port()), (boolean)true);
            this.failureDetector = new FailureDetectorImpl(this.localMember, this.transport, (Flux<MembershipEvent>)this.membershipEvents.onBackpressureBuffer(), this.config.failureDetectorConfig(), this.scheduler, this.cidGenerator);
            this.gossip = new GossipProtocolImpl(this.localMember, this.transport, (Flux<MembershipEvent>)this.membershipEvents.onBackpressureBuffer(), this.config.gossipConfig(), this.scheduler);
            this.metadataStore = new MetadataStoreImpl(this.localMember, this.transport, this.config.metadata(), this.config, this.scheduler, this.cidGenerator);
            this.membership = new MembershipProtocolImpl(this.localMember, this.transport, this.failureDetector, this.gossip, this.metadataStore, this.config, this.scheduler, this.cidGenerator);
            this.actionsDisposables.add(this.membership.listen().subscribe(arg_0 -> this.membershipSink.next(arg_0), this::onError));
            return Mono.fromRunnable(() -> this.failureDetector.start()).then(Mono.fromRunnable(() -> this.gossip.start())).then(Mono.fromRunnable(() -> this.metadataStore.start())).then(Mono.fromRunnable(this::startHandler)).then(this.membership.start()).then(Mono.fromCallable(() -> JmxMonitorMBean.start(this)));
        }).thenReturn((Object)this);
    }

    private void validateConfiguration() {
        Objects.requireNonNull(this.config.metadata(), "Invalid cluster config: metadata must be specified");
        Objects.requireNonNull(this.config.metadataDecoder(), "Invalid cluster config: metadataDecoder must be specified");
        Objects.requireNonNull(this.config.metadataEncoder(), "Invalid cluster config: metadataEncoder must be specified");
        Objects.requireNonNull(this.config.transportConfig().messageCodec(), "Invalid cluster config: transport.messageCodec must be specified");
        Objects.requireNonNull(this.config.membershipConfig().syncGroup(), "Invalid cluster config: membership.syncGroup must be specified");
    }

    private void startHandler() {
        ClusterMessageHandler handler = this.handler.apply(this);
        this.actionsDisposables.add(this.listenMessage().subscribe(arg_0 -> ((ClusterMessageHandler)handler).onMessage(arg_0), this::onError));
        this.actionsDisposables.add(this.listenMembership().subscribe(arg_0 -> ((ClusterMessageHandler)handler).onMembershipEvent(arg_0), this::onError));
        this.actionsDisposables.add(this.listenGossip().subscribe(arg_0 -> ((ClusterMessageHandler)handler).onGossip(arg_0), this::onError));
    }

    private void onError(Throwable th) {
        LOGGER.error("Received unexpected error: ", th);
    }

    private Flux<Message> listenMessage() {
        return this.transport.listen().filter(msg -> !SYSTEM_MESSAGES.contains(msg.qualifier()));
    }

    private Flux<Message> listenGossip() {
        return this.gossip.listen().filter(msg -> !SYSTEM_GOSSIPS.contains(msg.qualifier()));
    }

    private Flux<MembershipEvent> listenMembership() {
        return this.membershipEvents.onBackpressureBuffer();
    }

    private Member createLocalMember(int listenPort) {
        String localAddress = Address.getLocalIpAddress().getHostAddress();
        Integer port = Optional.ofNullable(this.config.memberPort()).orElse(listenPort);
        Address memberAddress = Optional.ofNullable(this.config.memberHost()).map(memberHost -> Address.create((String)memberHost, (int)port)).orElseGet(() -> Address.create((String)localAddress, (int)listenPort));
        return new Member(IdGenerator.generateId(), memberAddress);
    }

    public Address address() {
        return this.member().address();
    }

    public Mono<Void> send(Member member, Message message) {
        return this.send(member.address(), message);
    }

    public Mono<Void> send(Address address, Message message) {
        return this.transport.send(address, message);
    }

    public Mono<Message> requestResponse(Address address, Message request) {
        return this.transport.requestResponse(address, request);
    }

    public Mono<Message> requestResponse(Member member, Message request) {
        return this.transport.requestResponse(member.address(), request);
    }

    public Mono<String> spreadGossip(Message message) {
        return this.gossip.spread(message);
    }

    public Collection<Member> members() {
        return this.membership.members();
    }

    public Collection<Member> otherMembers() {
        return this.membership.otherMembers();
    }

    public <T> T metadata() {
        return (T)this.metadataStore.metadata();
    }

    public <T> Optional<T> metadata(Member member) {
        if (this.member().equals((Object)member)) {
            return Optional.of(this.metadata());
        }
        return this.metadataStore.metadata(member).map(byteBuffer -> this.config.metadataDecoder().decode(byteBuffer));
    }

    public Member member() {
        return this.localMember;
    }

    public Optional<Member> member(String id) {
        return this.membership.member(id);
    }

    public Optional<Member> member(Address address) {
        return this.membership.member(address);
    }

    public <T> Mono<Void> updateMetadata(T metadata) {
        return Mono.fromRunnable(() -> this.metadataStore.updateMetadata(metadata)).then(this.membership.updateIncarnation()).subscribeOn(this.scheduler);
    }

    public void shutdown() {
        this.shutdown.onComplete();
    }

    private Mono<Void> doShutdown() {
        return Mono.defer(() -> {
            LOGGER.info("Cluster member {} is shutting down", (Object)this.localMember);
            return Flux.concatDelayError((Publisher[])new Publisher[]{this.leaveCluster(this.localMember), this.dispose(), this.transport.stop()}).then().doFinally(s -> this.scheduler.dispose()).doOnSuccess(avoid -> LOGGER.info("Cluster member {} has been shut down", (Object)this.localMember)).doOnError(th -> LOGGER.warn("Cluster member {} failed on shutdown: {}", (Object)this.localMember, (Object)th.toString()));
        });
    }

    private Mono<Void> leaveCluster(Member member) {
        return this.membership.leaveCluster().subscribeOn(this.scheduler).doOnSuccess(s -> LOGGER.debug("Cluster member {} notified about his leaving and shutting down", (Object)member)).doOnError(ex -> LOGGER.info("Cluster member {} failed to spread leave notification to other cluster members: {}", (Object)member, (Object)ex.toString())).then();
    }

    private Mono<Void> dispose() {
        return Mono.fromRunnable(() -> {
            this.actionsDisposables.dispose();
            this.metadataStore.stop();
            this.membership.stop();
            this.gossip.stop();
            this.failureDetector.stop();
        });
    }

    public Mono<Void> onShutdown() {
        return this.onShutdown;
    }

    public boolean isShutdown() {
        return this.onShutdown.isDisposed();
    }

    private static class SenderAwareTransport
    implements Transport {
        private final Transport transport;
        private final Address sender;

        private SenderAwareTransport(Transport transport, Address sender) {
            this.transport = Objects.requireNonNull(transport);
            this.sender = Objects.requireNonNull(sender);
        }

        public Address address() {
            return this.transport.address();
        }

        public Mono<Void> stop() {
            return this.transport.stop();
        }

        public boolean isStopped() {
            return this.transport.isStopped();
        }

        public Mono<Void> send(Address address, Message message) {
            return Mono.defer(() -> this.transport.send(address, this.enhanceWithSender(message)));
        }

        public Mono<Message> requestResponse(Address address, Message request) {
            return Mono.defer(() -> this.transport.requestResponse(address, this.enhanceWithSender(request)));
        }

        public Flux<Message> listen() {
            return this.transport.listen();
        }

        private Message enhanceWithSender(Message message) {
            return Message.with((Message)message).sender(this.sender).build();
        }
    }

    public static class JmxMonitorMBean
    implements MonitorMBean {
        private final ClusterImpl cluster;

        private JmxMonitorMBean(ClusterImpl cluster) {
            this.cluster = cluster;
        }

        private static JmxMonitorMBean start(ClusterImpl cluster) throws Exception {
            JmxMonitorMBean monitorMBean = new JmxMonitorMBean(cluster);
            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
            StandardMBean standardMBean = new StandardMBean(monitorMBean, MonitorMBean.class);
            ObjectName objectName = new ObjectName("io.scalecube.cluster:name=Cluster@" + cluster.member().id());
            server.registerMBean(standardMBean, objectName);
            return monitorMBean;
        }

        @Override
        public Collection<String> getMember() {
            return Collections.singleton(this.cluster.member().id());
        }

        @Override
        public Collection<String> getMetadata() {
            return Collections.singletonList(this.cluster.metadataStore.metadata().toString());
        }
    }

    public static interface MonitorMBean {
        public Collection<String> getMember();

        public Collection<String> getMetadata();
    }
}

