/*
 * Decompiled with CFR 0.152.
 */
package org.apache.submarine.commons.cluster;

import com.google.common.collect.Maps;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.operation.OperationId;
import io.atomix.primitive.operation.OperationType;
import io.atomix.primitive.operation.PrimitiveOperation;
import io.atomix.primitive.operation.impl.DefaultOperationId;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.service.ServiceConfig;
import io.atomix.primitive.session.SessionClient;
import io.atomix.primitive.session.SessionId;
import io.atomix.protocols.raft.RaftClient;
import io.atomix.protocols.raft.RaftError;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
import io.atomix.protocols.raft.protocol.AppendRequest;
import io.atomix.protocols.raft.protocol.AppendResponse;
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
import io.atomix.protocols.raft.protocol.CommandRequest;
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
import io.atomix.protocols.raft.protocol.JoinResponse;
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
import io.atomix.protocols.raft.protocol.LeaveRequest;
import io.atomix.protocols.raft.protocol.LeaveResponse;
import io.atomix.protocols.raft.protocol.MetadataRequest;
import io.atomix.protocols.raft.protocol.MetadataResponse;
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
import io.atomix.protocols.raft.protocol.PollRequest;
import io.atomix.protocols.raft.protocol.PollResponse;
import io.atomix.protocols.raft.protocol.PublishRequest;
import io.atomix.protocols.raft.protocol.QueryRequest;
import io.atomix.protocols.raft.protocol.QueryResponse;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.RaftResponse;
import io.atomix.protocols.raft.protocol.ReconfigureRequest;
import io.atomix.protocols.raft.protocol.ReconfigureResponse;
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.protocol.VoteRequest;
import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.session.CommunicationStrategy;
import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
import io.atomix.protocols.raft.storage.system.Configuration;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Serializer;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.submarine.commons.cluster.ClusterMonitor;
import org.apache.submarine.commons.cluster.ClusterPrimitiveType;
import org.apache.submarine.commons.cluster.ClusterStateMachine;
import org.apache.submarine.commons.cluster.meta.ClusterMeta;
import org.apache.submarine.commons.cluster.meta.ClusterMetaEntity;
import org.apache.submarine.commons.cluster.meta.ClusterMetaOperation;
import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
import org.apache.submarine.commons.cluster.protocol.LocalRaftProtocolFactory;
import org.apache.submarine.commons.cluster.protocol.RaftClientMessagingProtocol;
import org.apache.submarine.commons.utils.NetworkUtils;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ClusterManager {
    private static Logger LOG = LoggerFactory.getLogger(ClusterManager.class);
    public final SubmarineConfiguration sconf = SubmarineConfiguration.getInstance();
    protected Collection<Node> clusterNodes = new ArrayList<Node>();
    protected int raftServerPort = 0;
    protected RaftClient raftClient = null;
    protected SessionClient raftSessionClient = null;
    protected Map<MemberId, Address> raftAddressMap = new ConcurrentHashMap<MemberId, Address>();
    protected LocalRaftProtocolFactory protocolFactory = new LocalRaftProtocolFactory(protocolSerializer);
    protected List<MemberId> clusterMemberIds = new ArrayList<MemberId>();
    protected AtomicBoolean running = new AtomicBoolean(true);
    private ConcurrentLinkedQueue<ClusterMetaEntity> clusterMetaQueue = new ConcurrentLinkedQueue();
    protected String serverHost = "";
    protected ClusterMonitor clusterMonitor = null;
    protected String MESSAGING_SERVICE_NAME = "SubmarineCluster";
    protected boolean isTest = false;
    protected static final Namespace storageNamespace = Namespace.builder().register(new Class[]{CloseSessionEntry.class}).register(new Class[]{CommandEntry.class}).register(new Class[]{ConfigurationEntry.class}).register(new Class[]{InitializeEntry.class}).register(new Class[]{KeepAliveEntry.class}).register(new Class[]{MetadataEntry.class}).register(new Class[]{OpenSessionEntry.class}).register(new Class[]{QueryEntry.class}).register(new Class[]{PrimitiveOperation.class}).register(new Class[]{DefaultOperationId.class}).register(new Class[]{OperationType.class}).register(new Class[]{ReadConsistency.class}).register(new Class[]{ArrayList.class}).register(new Class[]{HashSet.class}).register(new Class[]{DefaultRaftMember.class}).register(new Class[]{MemberId.class}).register(new Class[]{RaftMember.Type.class}).register(new Class[]{Instant.class}).register(new Class[]{Configuration.class}).register(new Class[]{byte[].class}).register(new Class[]{long[].class}).build();
    protected static final Serializer protocolSerializer = Serializer.using((Namespace)Namespace.builder().register(new Class[]{OpenSessionRequest.class}).register(new Class[]{OpenSessionResponse.class}).register(new Class[]{CloseSessionRequest.class}).register(new Class[]{CloseSessionResponse.class}).register(new Class[]{KeepAliveRequest.class}).register(new Class[]{KeepAliveResponse.class}).register(new Class[]{QueryRequest.class}).register(new Class[]{QueryResponse.class}).register(new Class[]{CommandRequest.class}).register(new Class[]{CommandResponse.class}).register(new Class[]{MetadataRequest.class}).register(new Class[]{MetadataResponse.class}).register(new Class[]{JoinRequest.class}).register(new Class[]{JoinResponse.class}).register(new Class[]{LeaveRequest.class}).register(new Class[]{LeaveResponse.class}).register(new Class[]{ConfigureRequest.class}).register(new Class[]{ConfigureResponse.class}).register(new Class[]{ReconfigureRequest.class}).register(new Class[]{ReconfigureResponse.class}).register(new Class[]{InstallRequest.class}).register(new Class[]{InstallResponse.class}).register(new Class[]{PollRequest.class}).register(new Class[]{PollResponse.class}).register(new Class[]{VoteRequest.class}).register(new Class[]{VoteResponse.class}).register(new Class[]{AppendRequest.class}).register(new Class[]{AppendResponse.class}).register(new Class[]{PublishRequest.class}).register(new Class[]{ResetRequest.class}).register(new Class[]{RaftResponse.Status.class}).register(new Class[]{RaftError.class}).register(new Class[]{RaftError.Type.class}).register(new Class[]{PrimitiveOperation.class}).register(new Class[]{ReadConsistency.class}).register(new Class[]{byte[].class}).register(new Class[]{long[].class}).register(new Class[]{CloseSessionEntry.class}).register(new Class[]{CommandEntry.class}).register(new Class[]{ConfigurationEntry.class}).register(new Class[]{InitializeEntry.class}).register(new Class[]{KeepAliveEntry.class}).register(new Class[]{MetadataEntry.class}).register(new Class[]{OpenSessionEntry.class}).register(new Class[]{QueryEntry.class}).register(new Class[]{PrimitiveOperation.class}).register(new Class[]{DefaultOperationId.class}).register(new Class[]{OperationType.class}).register(new Class[]{ReadConsistency.class}).register(new Class[]{ArrayList.class}).register(new Class[]{HashMap.class}).register(new Class[]{ClusterMetaEntity.class}).register(new Class[]{LocalDateTime.class}).register(new Class[]{Collections.emptyList().getClass()}).register(new Class[]{HashSet.class}).register(new Class[]{DefaultRaftMember.class}).register(new Class[]{MemberId.class}).register(new Class[]{SessionId.class}).register(new Class[]{RaftMember.Type.class}).register(new Class[]{Instant.class}).register(new Class[]{Configuration.class}).build());
    protected static final Serializer storageSerializer = Serializer.using((Namespace)Namespace.builder().register(new Class[]{CloseSessionEntry.class}).register(new Class[]{CommandEntry.class}).register(new Class[]{ConfigurationEntry.class}).register(new Class[]{InitializeEntry.class}).register(new Class[]{KeepAliveEntry.class}).register(new Class[]{MetadataEntry.class}).register(new Class[]{OpenSessionEntry.class}).register(new Class[]{QueryEntry.class}).register(new Class[]{PrimitiveOperation.class}).register(new Class[]{DefaultOperationId.class}).register(new Class[]{OperationType.class}).register(new Class[]{ReadConsistency.class}).register(new Class[]{ArrayList.class}).register(new Class[]{ClusterMetaEntity.class}).register(new Class[]{HashMap.class}).register(new Class[]{HashSet.class}).register(new Class[]{LocalDateTime.class}).register(new Class[]{DefaultRaftMember.class}).register(new Class[]{MemberId.class}).register(new Class[]{RaftMember.Type.class}).register(new Class[]{Instant.class}).register(new Class[]{Configuration.class}).register(new Class[]{byte[].class}).register(new Class[]{long[].class}).build());
    protected static final Serializer clientSerializer = Serializer.using((Namespace)Namespace.builder().register(new Class[]{ReadConsistency.class}).register(new Class[]{ClusterMetaEntity.class}).register(new Class[]{ClusterMetaOperation.class}).register(new Class[]{ClusterMetaType.class}).register(new Class[]{HashMap.class}).register(new Class[]{LocalDateTime.class}).register(new Class[]{Maps.immutableEntry((Object)new String(), (Object)new Object()).getClass()}).build());

    protected ClusterManager() {
        try {
            this.serverHost = NetworkUtils.findAvailableHostAddress();
            String clusterAddr = this.sconf.getClusterAddress();
            LOG.info("clusterAddr = {}", (Object)clusterAddr);
            if (!StringUtils.isEmpty((String)clusterAddr)) {
                String[] cluster = clusterAddr.split(",");
                for (int i = 0; i < cluster.length; ++i) {
                    String[] parts = cluster[i].split(":");
                    String clusterHost = parts[0];
                    int clusterPort = Integer.valueOf(parts[1]);
                    if (this.serverHost.equalsIgnoreCase(clusterHost)) {
                        this.raftServerPort = clusterPort;
                    }
                    String memberId = clusterHost + ":" + clusterPort;
                    Address address = Address.from((String)clusterHost, (int)clusterPort);
                    Node node = Node.builder().withId(memberId).withAddress(address).build();
                    this.clusterNodes.add(node);
                    this.raftAddressMap.put(MemberId.from((String)memberId), address);
                    this.clusterMemberIds.add(MemberId.from((String)memberId));
                }
            }
        }
        catch (SocketException | UnknownHostException e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
    }

    public abstract boolean raftInitialized();

    public abstract boolean isClusterLeader();

    public AtomicBoolean getRunning() {
        return this.running;
    }

    private SessionClient createProxy(RaftClient client) {
        return (SessionClient)((SessionClient)client.sessionBuilder("CLUSTER_PRIMITIVE", (PrimitiveType)ClusterPrimitiveType.INSTANCE, new ServiceConfig()).withReadConsistency(ReadConsistency.SEQUENTIAL).withCommunicationStrategy(CommunicationStrategy.LEADER).build()).connect().join();
    }

    public void start() {
        if (!this.sconf.isClusterMode()) {
            return;
        }
        LOG.info("ClusterManager::start()");
        new Thread(new Runnable(){

            @Override
            public void run() {
                LOG.info("RaftClientThread run() >>>");
                int raftClientPort = 0;
                try {
                    raftClientPort = NetworkUtils.findRandomAvailablePortOnAllLocalInterfaces();
                }
                catch (IOException e) {
                    LOG.error(e.getMessage(), (Throwable)e);
                }
                LOG.info("RaftClientThread {}:{}", (Object)ClusterManager.this.serverHost, (Object)raftClientPort);
                MemberId memberId = MemberId.from((String)(ClusterManager.this.serverHost + ":" + raftClientPort));
                Address address = Address.from((String)ClusterManager.this.serverHost, (int)raftClientPort);
                ClusterManager.this.raftAddressMap.put(memberId, address);
                MessagingService messagingManager = (MessagingService)new NettyMessagingService(ClusterManager.this.MESSAGING_SERVICE_NAME, address, new MessagingConfig()).start().join();
                RaftClientMessagingProtocol protocol = new RaftClientMessagingProtocol(messagingManager, protocolSerializer, ClusterManager.this.raftAddressMap::get);
                ClusterManager.this.raftClient = (RaftClient)RaftClient.builder().withMemberId(memberId).withPartitionId(PartitionId.from((String)"partition", (int)1)).withProtocol((RaftClientProtocol)protocol).build();
                ClusterManager.this.raftClient.connect(ClusterManager.this.clusterMemberIds).join();
                ClusterManager.this.raftSessionClient = ClusterManager.this.createProxy(ClusterManager.this.raftClient);
                LOG.info("RaftClientThread run() <<<");
            }
        }).start();
        new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    while (ClusterManager.this.getRunning().get()) {
                        ClusterMetaEntity metaEntity = (ClusterMetaEntity)ClusterManager.this.clusterMetaQueue.peek();
                        if (null != metaEntity) {
                            int retry = 0;
                            while (!ClusterManager.this.raftInitialized()) {
                                if (0 == ++retry % 30) {
                                    LOG.warn("Raft incomplete initialization! retry[{}]", (Object)retry);
                                }
                                Thread.sleep(100L);
                            }
                            boolean success = false;
                            switch (metaEntity.getOperation()) {
                                case DELETE_OPERATION: {
                                    success = ClusterManager.this.deleteClusterMeta(metaEntity);
                                    break;
                                }
                                case PUT_OPERATION: {
                                    success = ClusterManager.this.putClusterMeta(metaEntity);
                                }
                            }
                            if (success) {
                                ClusterManager.this.clusterMetaQueue.remove(metaEntity);
                                LOG.info("Cluster Meta Consume success! {}", (Object)metaEntity);
                                continue;
                            }
                            LOG.error("Cluster Meta Consume failed!");
                            continue;
                        }
                        Thread.sleep(100L);
                    }
                }
                catch (InterruptedException e) {
                    LOG.error(e.getMessage(), (Throwable)e);
                }
            }
        }).start();
    }

    public void shutdown() {
        if (!this.sconf.isClusterMode()) {
            return;
        }
        this.running.set(false);
        try {
            if (null != this.raftSessionClient) {
                LOG.info("ClusterManager::shutdown(raftSessionClient)");
                this.raftSessionClient.close().get(5L, TimeUnit.SECONDS);
            }
            if (null != this.raftClient) {
                LOG.info("ClusterManager::shutdown(raftClient)");
                this.raftClient.close().get(5L, TimeUnit.SECONDS);
            }
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
        LOG.info("ClusterManager::shutdown()");
    }

    public String getClusterNodeName() {
        if (this.isTest) {
            return this.serverHost + ":" + this.raftServerPort;
        }
        String hostName = "";
        try {
            InetAddress addr = InetAddress.getLocalHost();
            hostName = addr.getHostName().toString();
        }
        catch (IOException e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
        return hostName;
    }

    private boolean putClusterMeta(ClusterMetaEntity entity) {
        if (!this.raftInitialized()) {
            LOG.error(this.getClass().toString() + "::Raft incomplete initialization!");
            return false;
        }
        ClusterMetaType metaType = entity.getMetaType();
        String metaKey = entity.getKey();
        HashMap<String, Object> newMetaValue = entity.getValues();
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.getClass().toString() + "::putClusterMeta {} {}", (Object)metaType, (Object)metaKey);
        }
        newMetaValue.put(ClusterMeta.SERVER_HOST, this.serverHost);
        newMetaValue.put(ClusterMeta.SERVER_PORT, this.raftServerPort);
        this.raftSessionClient.execute(PrimitiveOperation.operation((OperationId)ClusterStateMachine.PUT, (byte[])clientSerializer.encode((Object)entity))).thenApply(arg_0 -> ((Serializer)clientSerializer).decode(arg_0));
        return true;
    }

    public void putClusterMeta(ClusterMetaType type, String key, HashMap<String, Object> values) {
        ClusterMetaEntity metaEntity = new ClusterMetaEntity(ClusterMetaOperation.PUT_OPERATION, type, key, values);
        boolean result = this.putClusterMeta(metaEntity);
        if (!result) {
            LOG.warn(this.getClass().toString() + "::putClusterMeta failure, Cache metadata to queue.");
            this.clusterMetaQueue.add(metaEntity);
        }
    }

    private boolean deleteClusterMeta(ClusterMetaEntity entity) {
        ClusterMetaType metaType = entity.getMetaType();
        String metaKey = entity.getKey();
        LOG.info(this.getClass().toString() + "::deleteClusterMeta {} {}", (Object)metaType, (Object)metaKey);
        if (!this.raftInitialized()) {
            LOG.error(this.getClass().toString() + "::Raft incomplete initialization!");
            return false;
        }
        ((CompletableFuture)this.raftSessionClient.execute(PrimitiveOperation.operation((OperationId)ClusterStateMachine.REMOVE, (byte[])clientSerializer.encode((Object)entity))).thenApply(arg_0 -> ((Serializer)clientSerializer).decode(arg_0))).thenAccept(result -> LOG.info(this.getClass().toString() + "::deleteClusterMeta {}", result));
        return true;
    }

    public void deleteClusterMeta(ClusterMetaType type, String key) {
        ClusterMetaEntity metaEntity = new ClusterMetaEntity(ClusterMetaOperation.DELETE_OPERATION, type, key, null);
        boolean result = this.deleteClusterMeta(metaEntity);
        if (!result) {
            LOG.warn(this.getClass().toString() + "::deleteClusterMeta failed, Cache data to queue.");
            this.clusterMetaQueue.add(metaEntity);
        }
    }

    public HashMap<String, HashMap<String, Object>> getClusterMeta(ClusterMetaType metaType, String metaKey) {
        HashMap clusterMeta = new HashMap();
        if (!this.raftInitialized()) {
            LOG.error(this.getClass().toString() + "::Raft incomplete initialization!");
            return clusterMeta;
        }
        ClusterMetaEntity entity = new ClusterMetaEntity(ClusterMetaOperation.GET_OPERATION, metaType, metaKey, null);
        byte[] mateData = null;
        try {
            mateData = (byte[])this.raftSessionClient.execute(PrimitiveOperation.operation((OperationId)ClusterStateMachine.GET, (byte[])clientSerializer.encode((Object)entity))).get(3L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
        if (null != mateData) {
            clusterMeta = (HashMap)clientSerializer.decode(mateData);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("getClusterMeta >>> {}", (Object)clusterMeta.toString());
        }
        return clusterMeta;
    }
}

