/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.cluster.membership;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetector;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorEvent;
import io.scalecube.cluster.gossip.GossipProtocol;
import io.scalecube.cluster.membership.MemberStatus;
import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocol;
import io.scalecube.cluster.membership.MembershipRecord;
import io.scalecube.cluster.membership.SyncData;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler;

public final class MembershipProtocolImpl
implements MembershipProtocol {
    private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocolImpl.class);
    private static final Logger LOGGER_MEMBERSHIP = LoggerFactory.getLogger((String)"io.scalecube.cluster.Membership");
    public static final String SYNC = "sc/membership/sync";
    public static final String SYNC_ACK = "sc/membership/syncAck";
    public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";
    private final Member localMember;
    private final Transport transport;
    private final MembershipConfig membershipConfig;
    private final FailureDetectorConfig failureDetectorConfig;
    private final List<Address> seedMembers;
    private final FailureDetector failureDetector;
    private final GossipProtocol gossipProtocol;
    private final MetadataStore metadataStore;
    private final CorrelationIdGenerator cidGenerator;
    private final Map<String, MembershipRecord> membershipTable = new HashMap<String, MembershipRecord>();
    private final Map<String, Member> members = new HashMap<String, Member>();
    private final FluxProcessor<MembershipEvent, MembershipEvent> subject = DirectProcessor.create().serialize();
    private final FluxSink<MembershipEvent> sink = this.subject.sink();
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final Scheduler scheduler;
    private final Map<String, Disposable> suspicionTimeoutTasks = new HashMap<String, Disposable>();

    public MembershipProtocolImpl(Member localMember, Transport transport, FailureDetector failureDetector, GossipProtocol gossipProtocol, MetadataStore metadataStore, ClusterConfig config, Scheduler scheduler, CorrelationIdGenerator cidGenerator) {
        this.transport = Objects.requireNonNull(transport);
        this.failureDetector = Objects.requireNonNull(failureDetector);
        this.gossipProtocol = Objects.requireNonNull(gossipProtocol);
        this.metadataStore = Objects.requireNonNull(metadataStore);
        this.localMember = Objects.requireNonNull(localMember);
        this.scheduler = Objects.requireNonNull(scheduler);
        this.cidGenerator = Objects.requireNonNull(cidGenerator);
        this.membershipConfig = Objects.requireNonNull(config).membershipConfig();
        this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig();
        this.seedMembers = this.cleanUpSeedMembers(this.membershipConfig.seedMembers());
        this.membershipTable.put(localMember.id(), new MembershipRecord(localMember, MemberStatus.ALIVE, 0));
        this.members.put(localMember.id(), localMember);
        this.actionsDisposables.addAll(Arrays.asList(transport.listen().publishOn(scheduler).subscribe(this::onMessage, this::onError), failureDetector.listen().publishOn(scheduler).subscribe(this::onFailureDetectorEvent, this::onError), gossipProtocol.listen().publishOn(scheduler).subscribe(this::onMembershipGossip, this::onError)));
    }

    private List<Address> cleanUpSeedMembers(Collection<Address> seedMembers) {
        return new LinkedHashSet<Address>(seedMembers).stream().filter(address -> !address.equals((Object)this.localMember.address())).filter(address -> !address.equals((Object)this.transport.address())).collect(Collectors.toList());
    }

    @Override
    public Flux<MembershipEvent> listen() {
        return this.subject.onBackpressureBuffer();
    }

    public Mono<Void> updateIncarnation() {
        return Mono.defer(() -> {
            MembershipRecord curRecord = this.membershipTable.get(this.localMember.id());
            MembershipRecord newRecord = new MembershipRecord(this.localMember, MemberStatus.ALIVE, curRecord.incarnation() + 1);
            this.membershipTable.put(this.localMember.id(), newRecord);
            return this.spreadMembershipGossip(newRecord);
        });
    }

    public Mono<Void> leaveCluster() {
        return Mono.defer(() -> {
            MembershipRecord curRecord = this.membershipTable.get(this.localMember.id());
            MembershipRecord newRecord = new MembershipRecord(this.localMember, MemberStatus.DEAD, curRecord.incarnation() + 1);
            this.membershipTable.put(this.localMember.id(), newRecord);
            return this.spreadMembershipGossip(newRecord);
        });
    }

    @Override
    public Mono<Void> start() {
        return Mono.create(this::start0).then(Mono.fromCallable(() -> JmxMonitorMBean.start(this))).then();
    }

    private void start0(MonoSink<Object> sink) {
        if (this.seedMembers.isEmpty()) {
            this.schedulePeriodicSync();
            sink.success();
            return;
        }
        LOGGER.debug("Making initial Sync to all seed members: {}", this.seedMembers);
        Mono[] syncs = (Mono[])this.seedMembers.stream().map(address -> {
            String cid = this.cidGenerator.nextCid();
            return this.transport.requestResponse(address, this.prepareSyncDataMsg(SYNC, cid)).filter(this::checkSyncGroup);
        }).toArray(Mono[]::new);
        Flux.mergeDelayError((int)syncs.length, (Publisher[])syncs).take(1L).timeout(Duration.ofMillis(this.membershipConfig.syncTimeout()), this.scheduler).publishOn(this.scheduler).flatMap(message -> this.onSyncAck((Message)message, true)).doFinally(s -> {
            this.schedulePeriodicSync();
            sink.success();
        }).subscribe(null, ex -> LOGGER.debug("Exception on initial SyncAck, cause: {}", (Object)ex.toString()));
    }

    @Override
    public void stop() {
        this.actionsDisposables.dispose();
        for (String memberId : this.suspicionTimeoutTasks.keySet()) {
            Disposable future = this.suspicionTimeoutTasks.get(memberId);
            if (future == null || future.isDisposed()) continue;
            future.dispose();
        }
        this.suspicionTimeoutTasks.clear();
        this.sink.complete();
    }

    @Override
    public Collection<Member> members() {
        return new ArrayList<Member>(this.members.values());
    }

    @Override
    public Collection<Member> otherMembers() {
        return new ArrayList<Member>(this.members.values()).stream().filter(member -> !member.equals((Object)this.localMember)).collect(Collectors.toList());
    }

    @Override
    public Member member() {
        return this.localMember;
    }

    @Override
    public Optional<Member> member(String id) {
        return Optional.ofNullable(this.members.get(id));
    }

    @Override
    public Optional<Member> member(Address address) {
        return new ArrayList<Member>(this.members.values()).stream().filter(member -> member.address().equals((Object)address)).findFirst();
    }

    private void doSync() {
        Optional<Address> addressOptional = this.selectSyncAddress();
        if (!addressOptional.isPresent()) {
            return;
        }
        Address address = addressOptional.get();
        Message message = this.prepareSyncDataMsg(SYNC, null);
        LOGGER.debug("Send Sync: {} to {}", (Object)message, (Object)address);
        this.transport.send(address, message).subscribe(null, ex -> LOGGER.debug("Failed to send Sync: {} to {}, cause: {}", new Object[]{message, address, ex.toString()}));
    }

    private void onMessage(Message message) {
        if (this.checkSyncGroup(message)) {
            if (SYNC.equals(message.qualifier())) {
                this.onSync(message).subscribe(null, this::onError);
            }
            if (SYNC_ACK.equals(message.qualifier()) && message.correlationId() == null) {
                this.onSyncAck(message, false).subscribe(null, this::onError);
            }
        }
    }

    private Mono<Void> onSyncAck(Message syncAckMsg, boolean onStart) {
        return Mono.defer(() -> {
            LOGGER.debug("Received SyncAck: {}", (Object)syncAckMsg);
            return this.syncMembership((SyncData)syncAckMsg.data(), onStart);
        });
    }

    private Mono<Void> onSync(Message syncMsg) {
        return Mono.defer(() -> {
            LOGGER.debug("Received Sync: {}", (Object)syncMsg);
            return this.syncMembership((SyncData)syncMsg.data(), false).doOnSuccess(avoid -> {
                Message message = this.prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
                Address address = syncMsg.sender();
                this.transport.send(address, message).subscribe(null, ex -> LOGGER.debug("Failed to send SyncAck: {} to {}, cause: {}", new Object[]{message, address, ex.toString()}));
            });
        });
    }

    private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
        MembershipRecord r0 = this.membershipTable.get(fdEvent.member().id());
        if (r0 == null) {
            return;
        }
        if (r0.status() == fdEvent.status()) {
            return;
        }
        LOGGER.debug("Received status change on failure detector event: {}", (Object)fdEvent);
        if (fdEvent.status() == MemberStatus.ALIVE) {
            Message syncMsg = this.prepareSyncDataMsg(SYNC, null);
            Address address = fdEvent.member().address();
            this.transport.send(address, syncMsg).subscribe(null, ex -> LOGGER.debug("Failed to send {} to {}, cause: {}", new Object[]{syncMsg, address, ex.toString()}));
        } else {
            MembershipRecord record = new MembershipRecord(r0.member(), fdEvent.status(), r0.incarnation());
            this.updateMembership(record, MembershipUpdateReason.FAILURE_DETECTOR_EVENT).subscribe(null, this::onError);
        }
    }

    private void onMembershipGossip(Message message) {
        if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) {
            MembershipRecord record = (MembershipRecord)message.data();
            LOGGER.debug("Received membership gossip: {}", (Object)record);
            this.updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP).subscribe(null, this::onError);
        }
    }

    private Optional<Address> selectSyncAddress() {
        List addresses = Stream.concat(this.seedMembers.stream(), this.otherMembers().stream().map(Member::address)).collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new));
        Collections.shuffle(addresses);
        if (addresses.isEmpty()) {
            return Optional.empty();
        }
        int i = ThreadLocalRandom.current().nextInt(addresses.size());
        return Optional.of(addresses.get(i));
    }

    private void onError(Throwable throwable) {
        LOGGER.error("Received unexpected error: ", throwable);
    }

    private void onErrorIgnore(Throwable throwable) {
    }

    private boolean checkSyncGroup(Message message) {
        if (message.data() instanceof SyncData) {
            SyncData syncData = (SyncData)message.data();
            return this.membershipConfig.syncGroup().equals(syncData.getSyncGroup());
        }
        return false;
    }

    private void schedulePeriodicSync() {
        int syncInterval = this.membershipConfig.syncInterval();
        this.actionsDisposables.add(this.scheduler.schedulePeriodically(this::doSync, (long)syncInterval, (long)syncInterval, TimeUnit.MILLISECONDS));
    }

    private Message prepareSyncDataMsg(String qualifier, String cid) {
        ArrayList<MembershipRecord> membershipRecords = new ArrayList<MembershipRecord>(this.membershipTable.values());
        SyncData syncData = new SyncData(membershipRecords, this.membershipConfig.syncGroup());
        return Message.withData((Object)syncData).qualifier(qualifier).correlationId(cid).build();
    }

    private Mono<Void> syncMembership(SyncData syncData, boolean onStart) {
        return Mono.defer(() -> {
            MembershipUpdateReason reason = onStart ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC;
            return Mono.whenDelayError((Publisher[])((Publisher[])syncData.getMembership().stream().map(r1 -> this.updateMembership((MembershipRecord)r1, reason)).toArray(Mono[]::new)));
        });
    }

    private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason reason) {
        return Mono.defer(() -> {
            Objects.requireNonNull(r1, "Membership record can't be null");
            MembershipRecord r0 = this.membershipTable.get(r1.id());
            if (r1.equals(r0) || !r1.isOverrides(r0)) {
                LOGGER_MEMBERSHIP.debug("(update reason: {}) skipping update, can't override r0: {} with received r1: {}", new Object[]{reason, r0, r1});
                return Mono.empty();
            }
            if (r1.member().address().equals((Object)this.localMember.address())) {
                if (r1.member().id().equals(this.localMember.id())) {
                    return this.onSelfMemberDetected(r0, r1, reason);
                }
                return Mono.empty();
            }
            if (r1.isDead()) {
                return this.onDeadMemberDetected(r1);
            }
            if (r1.isSuspect()) {
                this.membershipTable.put(r1.id(), r1);
                this.scheduleSuspicionTimeoutTask(r1);
                this.spreadMembershipGossipUnlessGossiped(r1, reason);
            }
            if (r1.isAlive() && (r0 == null || r0.incarnation() < r1.incarnation())) {
                return this.metadataStore.fetchMetadata(r1.member()).doOnError(ex -> LOGGER_MEMBERSHIP.debug("(update reason: {}) skipping to add/update member: {}, due to failed fetchMetadata call (cause: {})", new Object[]{reason, r1, ex.toString()})).doOnSuccess(metadata1 -> {
                    this.cancelSuspicionTimeoutTask(r1.id());
                    this.spreadMembershipGossipUnlessGossiped(r1, reason);
                    ByteBuffer metadata0 = this.metadataStore.updateMetadata(r1.member(), metadata1);
                    this.onAliveMemberDetected(r1, metadata0, (ByteBuffer)metadata1);
                }).onErrorResume(Exception.class, e -> Mono.empty()).then();
            }
            return Mono.empty();
        });
    }

    private Mono<Void> onSelfMemberDetected(MembershipRecord r0, MembershipRecord r1, MembershipUpdateReason reason) {
        return Mono.fromRunnable(() -> {
            int currentIncarnation = Math.max(r0.incarnation(), r1.incarnation());
            MembershipRecord r2 = new MembershipRecord(this.localMember, r0.status(), currentIncarnation + 1);
            this.membershipTable.put(this.localMember.id(), r2);
            LOGGER_MEMBERSHIP.debug("(update reason: {}) updating incarnation, local record r0: {} to received r1: {}, spreading with increased incarnation r2: {}", new Object[]{reason, r0, r1, r2});
            this.spreadMembershipGossip(r2).doOnError(this::onErrorIgnore).subscribe();
        });
    }

    private Mono<Void> onDeadMemberDetected(MembershipRecord r1) {
        return Mono.fromRunnable(() -> {
            this.cancelSuspicionTimeoutTask(r1.id());
            if (!this.members.containsKey(r1.id())) {
                return;
            }
            this.members.remove(r1.id());
            this.membershipTable.remove(r1.id());
            ByteBuffer metadata0 = this.metadataStore.removeMetadata(r1.member());
            MembershipEvent event = MembershipEvent.createRemoved((Member)r1.member(), (ByteBuffer)metadata0);
            LOGGER_MEMBERSHIP.debug("Emitting membership event {}", (Object)event);
            this.sink.next((Object)event);
        });
    }

    private void onAliveMemberDetected(MembershipRecord r1, ByteBuffer metadata0, ByteBuffer metadata1) {
        Member member = r1.member();
        boolean memberExists = this.members.containsKey(member.id());
        MembershipEvent event = null;
        if (!memberExists) {
            event = MembershipEvent.createAdded((Member)member, (ByteBuffer)metadata1);
        } else if (!metadata1.equals(metadata0)) {
            event = MembershipEvent.createUpdated((Member)member, (ByteBuffer)metadata0, (ByteBuffer)metadata1);
        }
        this.members.put(member.id(), member);
        this.membershipTable.put(member.id(), r1);
        if (event != null) {
            LOGGER_MEMBERSHIP.debug("Emitting membership event {}", (Object)event);
            this.sink.next((Object)event);
        }
    }

    private void cancelSuspicionTimeoutTask(String memberId) {
        Disposable future = this.suspicionTimeoutTasks.remove(memberId);
        if (future != null && !future.isDisposed()) {
            LOGGER.debug("Cancelled SuspicionTimeoutTask for {}", (Object)memberId);
            future.dispose();
        }
    }

    private void scheduleSuspicionTimeoutTask(MembershipRecord record) {
        long suspicionTimeout = ClusterMath.suspicionTimeout(this.membershipConfig.suspicionMult(), this.membershipTable.size(), this.failureDetectorConfig.pingInterval());
        this.suspicionTimeoutTasks.computeIfAbsent(record.id(), id -> {
            LOGGER.debug("Scheduled SuspicionTimeoutTask for {}, suspicionTimeout {}", id, (Object)suspicionTimeout);
            return this.scheduler.schedule(() -> this.onSuspicionTimeout((String)id), suspicionTimeout, TimeUnit.MILLISECONDS);
        });
    }

    private void onSuspicionTimeout(String memberId) {
        this.suspicionTimeoutTasks.remove(memberId);
        MembershipRecord record = this.membershipTable.get(memberId);
        if (record != null) {
            LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", (Object)record);
            MembershipRecord deadRecord = new MembershipRecord(record.member(), MemberStatus.DEAD, record.incarnation());
            this.updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT).subscribe(null, this::onError);
        }
    }

    private void spreadMembershipGossipUnlessGossiped(MembershipRecord r1, MembershipUpdateReason reason) {
        if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP && reason != MembershipUpdateReason.INITIAL_SYNC) {
            this.spreadMembershipGossip(r1).doOnError(this::onErrorIgnore).subscribe();
        }
    }

    private Mono<Void> spreadMembershipGossip(MembershipRecord record) {
        return Mono.defer(() -> {
            Message msg = Message.withData((Object)record).qualifier(MEMBERSHIP_GOSSIP).build();
            LOGGER.debug("Spead membreship: {} with gossip", (Object)msg);
            return this.gossipProtocol.spread(msg).doOnError(ex -> LOGGER.debug("Failed to spread membership: {} with gossip, cause: {}", (Object)msg, (Object)ex.toString())).then();
        });
    }

    FailureDetector getFailureDetector() {
        return this.failureDetector;
    }

    GossipProtocol getGossipProtocol() {
        return this.gossipProtocol;
    }

    Transport getTransport() {
        return this.transport;
    }

    MetadataStore getMetadataStore() {
        return this.metadataStore;
    }

    List<MembershipRecord> getMembershipRecords() {
        return Collections.unmodifiableList(new ArrayList<MembershipRecord>(this.membershipTable.values()));
    }

    public static class JmxMonitorMBean
    implements MonitorMBean {
        public static final int REMOVED_MEMBERS_HISTORY_SIZE = 42;
        private final MembershipProtocolImpl membershipProtocol;
        private final ReplayProcessor<MembershipEvent> removedMembersHistory;

        private JmxMonitorMBean(MembershipProtocolImpl membershipProtocol) {
            this.membershipProtocol = membershipProtocol;
            this.removedMembersHistory = ReplayProcessor.create((int)42);
            membershipProtocol.listen().filter(MembershipEvent::isRemoved).subscribe(this.removedMembersHistory);
        }

        private static JmxMonitorMBean start(MembershipProtocolImpl membershipProtocol) throws Exception {
            JmxMonitorMBean monitorMBean = new JmxMonitorMBean(membershipProtocol);
            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
            ObjectName objectName = new ObjectName("io.scalecube.cluster:name=Membership@" + membershipProtocol.localMember.id());
            StandardMBean standardMBean = new StandardMBean(monitorMBean, MonitorMBean.class);
            server.registerMBean(standardMBean, objectName);
            return monitorMBean;
        }

        @Override
        public int getIncarnation() {
            Map membershipTable = this.membershipProtocol.membershipTable;
            String localMemberId = this.membershipProtocol.localMember.id();
            return ((MembershipRecord)membershipTable.get(localMemberId)).incarnation();
        }

        @Override
        public List<String> getAliveMembers() {
            return this.findRecordsByCondition(MembershipRecord::isAlive);
        }

        @Override
        public List<String> getSuspectedMembers() {
            return this.findRecordsByCondition(MembershipRecord::isSuspect);
        }

        @Override
        public List<String> getDeadMembers() {
            ArrayList<String> deadMembers = new ArrayList<String>();
            this.removedMembersHistory.map(MembershipEvent::toString).subscribe(deadMembers::add);
            return deadMembers;
        }

        private List<String> findRecordsByCondition(Predicate<MembershipRecord> condition) {
            return this.membershipProtocol.getMembershipRecords().stream().filter(condition).map(record -> new Member(record.id(), record.address())).map(Member::toString).collect(Collectors.toList());
        }
    }

    public static interface MonitorMBean {
        public int getIncarnation();

        public List<String> getAliveMembers();

        public List<String> getSuspectedMembers();

        public List<String> getDeadMembers();
    }

    private static enum MembershipUpdateReason {
        FAILURE_DETECTOR_EVENT,
        MEMBERSHIP_GOSSIP,
        SYNC,
        INITIAL_SYNC,
        SUSPICION_TIMEOUT;

    }
}

