/*
 * Decompiled with CFR 0.152.
 */
package org.apache.submarine.commons.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.discovery.ManagedNodeDiscoveryService;
import io.atomix.cluster.discovery.NodeDiscoveryProvider;
import io.atomix.cluster.impl.DefaultClusterMembershipService;
import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
import io.atomix.cluster.messaging.BroadcastService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.UnicastService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.cluster.protocol.GroupMembershipProtocol;
import io.atomix.cluster.protocol.HeartbeatMembershipProtocol;
import io.atomix.cluster.protocol.HeartbeatMembershipProtocolConfig;
import io.atomix.primitive.PrimitiveState;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import io.atomix.utils.Version;
import io.atomix.utils.net.Address;
import java.io.File;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.submarine.commons.cluster.BroadcastServiceAdapter;
import org.apache.submarine.commons.cluster.ClusterManager;
import org.apache.submarine.commons.cluster.ClusterMonitor;
import org.apache.submarine.commons.cluster.UnicastServiceAdapter;
import org.apache.submarine.commons.cluster.meta.ClusterMeta;
import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
import org.apache.submarine.commons.cluster.protocol.RaftServerMessagingProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterServer
extends ClusterManager {
    private static Logger LOG = LoggerFactory.getLogger(ClusterServer.class);
    private static ClusterServer instance = null;
    protected RaftServer raftServer = null;
    protected MessagingService messagingService = null;

    private ClusterServer() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static ClusterServer getInstance() {
        Class<ClusterServer> clazz = ClusterServer.class;
        synchronized (ClusterServer.class) {
            if (instance == null) {
                instance = new ClusterServer();
            }
            // ** MonitorExit[var0] (shouldn't be in output)
            return instance;
        }
    }

    @Override
    public void start() {
        if (!this.sconf.isClusterMode()) {
            return;
        }
        this.initThread();
        String clusterName = this.getClusterNodeName();
        this.clusterMonitor = new ClusterMonitor(this);
        this.clusterMonitor.start(ClusterMetaType.SERVER_META, clusterName);
        super.start();
    }

    @VisibleForTesting
    void initTestCluster(String clusterAddrList, String host, int port) {
        this.isTest = true;
        this.serverHost = host;
        this.raftServerPort = port;
        this.clusterNodes.clear();
        this.raftAddressMap.clear();
        this.clusterMemberIds.clear();
        String[] cluster = clusterAddrList.split(",");
        for (int i = 0; i < cluster.length; ++i) {
            String[] parts = cluster[i].split(":");
            String clusterHost = parts[0];
            int clusterPort = Integer.valueOf(parts[1]);
            String memberId = clusterHost + ":" + clusterPort;
            Address address = Address.from((String)clusterHost, (int)clusterPort);
            Node node = Node.builder().withId(memberId).withAddress(address).build();
            this.clusterNodes.add(node);
            this.raftAddressMap.put(MemberId.from((String)memberId), address);
            this.clusterMemberIds.add(MemberId.from((String)memberId));
        }
    }

    @Override
    public boolean raftInitialized() {
        return null != this.raftServer && this.raftServer.isRunning() && null != this.raftClient && null != this.raftSessionClient && this.raftSessionClient.getState() == PrimitiveState.CONNECTED;
    }

    @Override
    public boolean isClusterLeader() {
        return null != this.raftServer && this.raftServer.isRunning() && this.raftServer.isLeader();
    }

    private void initThread() {
        new Thread(new Runnable(){

            @Override
            public void run() {
                LOG.info("RaftServer run({}:{}) >>>", (Object)ClusterServer.this.serverHost, (Object)ClusterServer.this.raftServerPort);
                Address address = Address.from((String)ClusterServer.this.serverHost, (int)ClusterServer.this.raftServerPort);
                Member member = Member.builder((MemberId)MemberId.from((String)(ClusterServer.this.serverHost + ":" + ClusterServer.this.raftServerPort))).withAddress(address).build();
                ClusterServer.this.messagingService = (MessagingService)new NettyMessagingService(ClusterServer.this.MESSAGING_SERVICE_NAME, member.address(), new MessagingConfig()).start().join();
                RaftServerMessagingProtocol protocol = new RaftServerMessagingProtocol(ClusterServer.this.messagingService, ClusterManager.protocolSerializer, ClusterServer.this.raftAddressMap::get);
                BootstrapService bootstrapService = new BootstrapService(){

                    public MessagingService getMessagingService() {
                        return ClusterServer.this.messagingService;
                    }

                    public UnicastService getUnicastService() {
                        return new UnicastServiceAdapter();
                    }

                    public BroadcastService getBroadcastService() {
                        return new BroadcastServiceAdapter();
                    }
                };
                DefaultClusterMembershipService clusterService = new DefaultClusterMembershipService(member, Version.from((String)"1.0.0"), (ManagedNodeDiscoveryService)new DefaultNodeDiscoveryService(bootstrapService, (Node)member, (NodeDiscoveryProvider)new BootstrapDiscoveryProvider(ClusterServer.this.clusterNodes)), bootstrapService, (GroupMembershipProtocol)new HeartbeatMembershipProtocol(new HeartbeatMembershipProtocolConfig()));
                File atomixDateDir = Files.createTempDir();
                atomixDateDir.deleteOnExit();
                RaftServer.Builder builder = RaftServer.builder((MemberId)member.id()).withMembershipService((ClusterMembershipService)clusterService).withProtocol((RaftServerProtocol)protocol).withStorage(RaftStorage.builder().withStorageLevel(StorageLevel.MEMORY).withDirectory(atomixDateDir).withNamespace(ClusterManager.storageNamespace).withMaxSegmentSize(0x100000).build());
                ClusterServer.this.raftServer = (RaftServer)builder.build();
                ClusterServer.this.raftServer.bootstrap((Collection)ClusterServer.this.clusterMemberIds);
                HashMap<String, Object> meta = new HashMap<String, Object>();
                String nodeName = ClusterServer.this.getClusterNodeName();
                meta.put(ClusterMeta.NODE_NAME, nodeName);
                meta.put(ClusterMeta.SERVER_HOST, ClusterServer.this.serverHost);
                meta.put(ClusterMeta.SERVER_PORT, ClusterServer.this.raftServerPort);
                meta.put(ClusterMeta.SERVER_START_TIME, LocalDateTime.now());
                ClusterServer.this.putClusterMeta(ClusterMetaType.SERVER_META, nodeName, meta);
                LOG.info("RaftServer run() <<<");
            }
        }).start();
    }

    @Override
    public void shutdown() {
        if (!this.sconf.isClusterMode()) {
            return;
        }
        LOG.info("ClusterServer::shutdown()");
        try {
            this.deleteClusterMeta(ClusterMetaType.SERVER_META, this.getClusterNodeName());
            Thread.sleep(500L);
            if (null != this.clusterMonitor) {
                this.clusterMonitor.shutdown();
            }
            Thread.sleep(500L);
        }
        catch (InterruptedException e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
        super.shutdown();
        if (null != this.raftServer && this.raftServer.isRunning()) {
            try {
                LOG.info("ClusterServer::raftServer.shutdown()");
                this.raftServer.shutdown().get(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                LOG.error(e.getMessage(), (Throwable)e);
            }
        }
        LOG.info("ClusterServer::super.shutdown()");
    }

    public HashMap<String, Object> getIdleNodeMeta() {
        HashMap<String, Object> idleNodeMeta = null;
        HashMap<String, HashMap<String, Object>> clusterMeta = this.getClusterMeta(ClusterMetaType.SERVER_META, "");
        long memoryIdle = 0L;
        for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
            long memoryUsed;
            long memoryCapacity;
            long idle;
            HashMap<String, Object> meta = entry.getValue();
            String status = (String)meta.get(ClusterMeta.STATUS);
            if (null == status || StringUtils.isEmpty((String)status) || status.equals(ClusterMeta.OFFLINE_STATUS) || (idle = (memoryCapacity = ((Long)meta.get(ClusterMeta.MEMORY_CAPACITY)).longValue()) - (memoryUsed = ((Long)meta.get(ClusterMeta.MEMORY_USED)).longValue())) <= memoryIdle) continue;
            memoryIdle = idle;
            idleNodeMeta = meta;
        }
        return idleNodeMeta;
    }

    public void unicastClusterEvent(String host, int port, String topic, String msg) {
        LOG.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}", new Object[]{host, port, topic, msg});
        Address address = Address.from((String)host, (int)port);
        CompletableFuture response = this.messagingService.sendAndReceive(address, topic, msg.getBytes(), Duration.ofSeconds(2L));
        response.whenComplete((r, e) -> {
            if (null == e) {
                LOG.error(e.getMessage(), e);
            }
        });
    }

    public void broadcastClusterEvent(String topic, String msg) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("send broadcastClusterEvent message {}", (Object)msg);
        }
        for (Node node : this.clusterNodes) {
            if (StringUtils.equals((String)node.address().host(), (String)this.serverHost) && node.address().port() == this.raftServerPort) continue;
            CompletableFuture response = this.messagingService.sendAndReceive(node.address(), topic, msg.getBytes(), Duration.ofSeconds(2L));
            response.whenComplete((r, e) -> {
                if (null == e) {
                    LOG.error(e.getMessage(), e);
                } else {
                    LOG.info("broadcastClusterNoteEvent success! {}", (Object)msg);
                }
            });
        }
    }
}

