/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.om.ratis;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMNodeDetails;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OzoneManagerRatisServer {
    private static final Logger LOG = LoggerFactory.getLogger(OzoneManagerRatisServer.class);
    private final int port;
    private final InetSocketAddress omRatisAddress;
    private final RaftServer server;
    private final RaftGroupId raftGroupId;
    private final RaftGroup raftGroup;
    private final RaftPeerId raftPeerId;
    private final OzoneManager ozoneManager;
    private final OzoneManagerStateMachine omStateMachine;
    private final ClientId clientId = ClientId.randomId();
    private final ScheduledExecutorService scheduledRoleChecker;
    private long roleCheckInitialDelayMs = 1000L;
    private long roleCheckIntervalMs;
    private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
    private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
    private Optional<RaftPeerId> cachedLeaderPeerId = Optional.empty();
    private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();

    private static long nextCallId() {
        return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
    }

    public OzoneManagerProtocolProtos.OMResponse submitRequest(OzoneManagerProtocolProtos.OMRequest omRequest) throws ServiceException {
        RaftClientReply raftClientReply;
        RaftClientRequest raftClientRequest = this.createWriteRaftClientRequest(omRequest);
        try {
            raftClientReply = (RaftClientReply)this.server.submitClientRequestAsync(raftClientRequest).get();
        }
        catch (Exception ex) {
            throw new ServiceException(ex.getMessage(), (Throwable)ex);
        }
        return this.processReply(omRequest, raftClientReply);
    }

    private RaftClientRequest createWriteRaftClientRequest(OzoneManagerProtocolProtos.OMRequest omRequest) {
        return new RaftClientRequest(this.clientId, this.server.getId(), this.raftGroupId, OzoneManagerRatisServer.nextCallId(), Message.valueOf((ByteString)OMRatisHelper.convertRequestToByteString((OzoneManagerProtocolProtos.OMRequest)omRequest)), RaftClientRequest.writeRequestType(), null);
    }

    private OzoneManagerProtocolProtos.OMResponse processReply(OzoneManagerProtocolProtos.OMRequest omRequest, RaftClientReply reply) throws ServiceException {
        NotLeaderException notLeaderException = reply.getNotLeaderException();
        if (notLeaderException != null) {
            throw new ServiceException((Throwable)notLeaderException);
        }
        StateMachineException stateMachineException = reply.getStateMachineException();
        if (stateMachineException != null) {
            OzoneManagerProtocolProtos.OMResponse.Builder omResponse = OzoneManagerProtocolProtos.OMResponse.newBuilder();
            omResponse.setCmdType(omRequest.getCmdType());
            omResponse.setSuccess(false);
            omResponse.setMessage(stateMachineException.getCause().getMessage());
            omResponse.setStatus(this.parseErrorStatus(stateMachineException.getCause().getMessage()));
            LOG.debug("Error while executing ratis request. stateMachineException: ", (Throwable)stateMachineException);
            return omResponse.build();
        }
        try {
            return OMRatisHelper.getOMResponseFromRaftClientReply((RaftClientReply)reply);
        }
        catch (InvalidProtocolBufferException ex) {
            if (ex.getMessage() != null) {
                throw new ServiceException(ex.getMessage(), (Throwable)ex);
            }
            throw new ServiceException((Throwable)ex);
        }
    }

    private OzoneManagerProtocolProtos.Status parseErrorStatus(String errorMessage) {
        if (errorMessage.contains("STATUS_CODE=")) {
            String errorCode = errorMessage.substring(errorMessage.indexOf("STATUS_CODE=") + "STATUS_CODE=".length());
            LOG.debug("Parsing error message for error code " + errorCode);
            return OzoneManagerProtocolProtos.Status.valueOf((String)errorCode.trim());
        }
        return OzoneManagerProtocolProtos.Status.INTERNAL_ERROR;
    }

    private OzoneManagerRatisServer(Configuration conf, OzoneManager om, String raftGroupIdStr, RaftPeerId localRaftPeerId, InetSocketAddress addr, List<RaftPeer> raftPeers) throws IOException {
        this.ozoneManager = om;
        this.omRatisAddress = addr;
        this.port = addr.getPort();
        RaftProperties serverProperties = this.newRaftProperties(conf);
        this.raftPeerId = localRaftPeerId;
        this.raftGroupId = RaftGroupId.valueOf((UUID)this.getRaftGroupIdFromOmServiceId(raftGroupIdStr));
        this.raftGroup = RaftGroup.valueOf((RaftGroupId)this.raftGroupId, raftPeers);
        StringBuilder raftPeersStr = new StringBuilder();
        for (RaftPeer peer : raftPeers) {
            raftPeersStr.append(", ").append(peer.getAddress());
        }
        LOG.info("Instantiating OM Ratis server with GroupID: {} and Raft Peers: {}", (Object)raftGroupIdStr, (Object)raftPeersStr.toString().substring(2));
        this.omStateMachine = this.getStateMachine();
        this.server = RaftServer.newBuilder().setServerId(this.raftPeerId).setGroup(this.raftGroup).setProperties(serverProperties).setStateMachine((StateMachine)this.omStateMachine).build();
        this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
        this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                if (OzoneManagerRatisServer.this.cachedPeerRole.isPresent() && OzoneManagerRatisServer.this.cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) {
                    OzoneManagerRatisServer.this.updateServerRole();
                }
            }
        }, this.roleCheckInitialDelayMs, this.roleCheckIntervalMs, TimeUnit.MILLISECONDS);
    }

    public static OzoneManagerRatisServer newOMRatisServer(Configuration ozoneConf, OzoneManager omProtocol, OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes) throws IOException {
        String omServiceId = omNodeDetails.getOMServiceId();
        String omNodeId = omNodeDetails.getOMNodeId();
        RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId((String)omNodeId);
        InetSocketAddress ratisAddr = new InetSocketAddress(omNodeDetails.getAddress(), omNodeDetails.getRatisPort());
        RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
        ArrayList<RaftPeer> raftPeers = new ArrayList<RaftPeer>();
        raftPeers.add(localRaftPeer);
        for (OMNodeDetails peerInfo : peerNodes) {
            String peerNodeId = peerInfo.getOMNodeId();
            InetSocketAddress peerRatisAddr = new InetSocketAddress(peerInfo.getAddress(), peerInfo.getRatisPort());
            RaftPeerId raftPeerId = RaftPeerId.valueOf((String)peerNodeId);
            RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
            raftPeers.add(raftPeer);
        }
        return new OzoneManagerRatisServer(ozoneConf, omProtocol, omServiceId, localRaftPeerId, ratisAddr, raftPeers);
    }

    public RaftGroup getRaftGroup() {
        return this.raftGroup;
    }

    private OzoneManagerStateMachine getStateMachine() {
        return new OzoneManagerStateMachine(this);
    }

    @VisibleForTesting
    public OzoneManagerStateMachine getOmStateMachine() {
        return this.omStateMachine;
    }

    public OzoneManager getOzoneManager() {
        return this.ozoneManager;
    }

    public void start() throws IOException {
        LOG.info("Starting {} {} at port {}", new Object[]{this.getClass().getSimpleName(), this.server.getId(), this.port});
        this.server.start();
    }

    public void stop() {
        try {
            this.server.close();
            this.omStateMachine.stop();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private RaftProperties newRaftProperties(Configuration conf) {
        RaftProperties properties = new RaftProperties();
        String rpcType = conf.get("ozone.om.ratis.rpc.type", "GRPC");
        SupportedRpcType rpc = SupportedRpcType.valueOfIgnoreCase((String)rpcType);
        RaftConfigKeys.Rpc.setType((RaftProperties)properties, (RpcType)rpc);
        if (rpc == SupportedRpcType.GRPC) {
            GrpcConfigKeys.Server.setPort((RaftProperties)properties, (int)this.port);
        } else if (rpc == SupportedRpcType.NETTY) {
            NettyConfigKeys.Server.setPort((RaftProperties)properties, (int)this.port);
        }
        String storageDir = OmUtils.getOMRatisDirectory((Configuration)conf);
        RaftServerConfigKeys.setStorageDirs((RaftProperties)properties, Collections.singletonList(new File(storageDir)));
        int raftSegmentSize = (int)conf.getStorageSize("ozone.om.ratis.segment.size", "16KB", StorageUnit.BYTES);
        RaftServerConfigKeys.Log.setSegmentSizeMax((RaftProperties)properties, (SizeInBytes)SizeInBytes.valueOf((long)raftSegmentSize));
        int raftSegmentPreallocatedSize = (int)conf.getStorageSize("ozone.om.ratis.segment.preallocated.size", "16KB", StorageUnit.BYTES);
        int logAppenderQueueNumElements = conf.getInt("ozone.om.ratis.log.appender.queue.num-elements", 1024);
        int logAppenderQueueByteLimit = (int)conf.getStorageSize("ozone.om.ratis.log.appender.queue.byte-limit", "32MB", StorageUnit.BYTES);
        RaftServerConfigKeys.Log.Appender.setBufferElementLimit((RaftProperties)properties, (int)logAppenderQueueNumElements);
        RaftServerConfigKeys.Log.Appender.setBufferByteLimit((RaftProperties)properties, (SizeInBytes)SizeInBytes.valueOf((long)logAppenderQueueByteLimit));
        RaftServerConfigKeys.Log.setPreallocatedSize((RaftProperties)properties, (SizeInBytes)SizeInBytes.valueOf((long)raftSegmentPreallocatedSize));
        RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled((RaftProperties)properties, (boolean)false);
        int logPurgeGap = conf.getInt("ozone.om.ratis.log.purge.gap", 1000000);
        RaftServerConfigKeys.Log.setPurgeGap((RaftProperties)properties, (int)logPurgeGap);
        GrpcConfigKeys.setMessageSizeMax((RaftProperties)properties, (SizeInBytes)SizeInBytes.valueOf((long)logAppenderQueueByteLimit));
        TimeUnit serverRequestTimeoutUnit = OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit();
        long serverRequestTimeoutDuration = conf.getTimeDuration("ozone.om.ratis.server.request.timeout", OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getDuration(), serverRequestTimeoutUnit);
        TimeDuration serverRequestTimeout = TimeDuration.valueOf((long)serverRequestTimeoutDuration, (TimeUnit)serverRequestTimeoutUnit);
        RaftServerConfigKeys.Rpc.setRequestTimeout((RaftProperties)properties, (TimeDuration)serverRequestTimeout);
        TimeUnit retryCacheTimeoutUnit = OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit();
        long retryCacheTimeoutDuration = conf.getTimeDuration("ozone.om.ratis.server.retry.cache.timeout", OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getDuration(), retryCacheTimeoutUnit);
        TimeDuration retryCacheTimeout = TimeDuration.valueOf((long)retryCacheTimeoutDuration, (TimeUnit)retryCacheTimeoutUnit);
        RaftServerConfigKeys.RetryCache.setExpiryTime((RaftProperties)properties, (TimeDuration)retryCacheTimeout);
        TimeUnit serverMinTimeoutUnit = OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit();
        long serverMinTimeoutDuration = conf.getTimeDuration("ozone.om.ratis.minimum.timeout", OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getDuration(), serverMinTimeoutUnit);
        TimeDuration serverMinTimeout = TimeDuration.valueOf((long)serverMinTimeoutDuration, (TimeUnit)serverMinTimeoutUnit);
        long serverMaxTimeoutDuration = serverMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200L;
        TimeDuration serverMaxTimeout = TimeDuration.valueOf((long)serverMaxTimeoutDuration, (TimeUnit)serverMinTimeoutUnit);
        RaftServerConfigKeys.Rpc.setTimeoutMin((RaftProperties)properties, (TimeDuration)serverMinTimeout);
        RaftServerConfigKeys.Rpc.setTimeoutMax((RaftProperties)properties, (TimeDuration)serverMaxTimeout);
        RaftServerConfigKeys.Log.setMaxCachedSegmentNum((RaftProperties)properties, (int)2);
        TimeUnit clientRequestTimeoutUnit = OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT.getUnit();
        long clientRequestTimeoutDuration = conf.getTimeDuration("ozone.om.ratis.client.request.timeout.duration", OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT.getDuration(), clientRequestTimeoutUnit);
        TimeDuration clientRequestTimeout = TimeDuration.valueOf((long)clientRequestTimeoutDuration, (TimeUnit)clientRequestTimeoutUnit);
        RaftClientConfigKeys.Rpc.setRequestTimeout((RaftProperties)properties, (TimeDuration)clientRequestTimeout);
        TimeUnit leaderElectionMinTimeoutUnit = OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT.getUnit();
        long leaderElectionMinTimeoutduration = conf.getTimeDuration("ozone.om.leader.election.minimum.timeout.duration", OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT.getDuration(), leaderElectionMinTimeoutUnit);
        TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf((long)leaderElectionMinTimeoutduration, (TimeUnit)leaderElectionMinTimeoutUnit);
        RaftServerConfigKeys.Rpc.setTimeoutMin((RaftProperties)properties, (TimeDuration)leaderElectionMinTimeout);
        long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200L;
        RaftServerConfigKeys.Rpc.setTimeoutMax((RaftProperties)properties, (TimeDuration)TimeDuration.valueOf((long)leaderElectionMaxTimeout, (TimeUnit)TimeUnit.MILLISECONDS));
        TimeUnit nodeFailureTimeoutUnit = OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.getUnit();
        long nodeFailureTimeoutDuration = conf.getTimeDuration("ozone.om.ratis.server.failure.timeout.duration", OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.getDuration(), nodeFailureTimeoutUnit);
        TimeDuration nodeFailureTimeout = TimeDuration.valueOf((long)nodeFailureTimeoutDuration, (TimeUnit)nodeFailureTimeoutUnit);
        RaftServerConfigKeys.Notification.setNoLeaderTimeout((RaftProperties)properties, (TimeDuration)nodeFailureTimeout);
        RaftServerConfigKeys.Rpc.setSlownessTimeout((RaftProperties)properties, (TimeDuration)nodeFailureTimeout);
        TimeUnit roleCheckIntervalUnit = OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT.getUnit();
        long roleCheckIntervalDuration = conf.getTimeDuration("ozone.om.ratis.server.role.check.interval", OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT.getDuration(), nodeFailureTimeoutUnit);
        this.roleCheckIntervalMs = TimeDuration.valueOf((long)roleCheckIntervalDuration, (TimeUnit)roleCheckIntervalUnit).toLong(TimeUnit.MILLISECONDS);
        this.roleCheckInitialDelayMs = leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS);
        long snapshotAutoTriggerThreshold = conf.getLong("ozone.om.ratis.snapshot.auto.trigger.threshold", 400000L);
        RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled((RaftProperties)properties, (boolean)true);
        RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold((RaftProperties)properties, (long)snapshotAutoTriggerThreshold);
        return properties;
    }

    private boolean checkCachedPeerRoleIsLeader() {
        this.roleCheckLock.readLock().lock();
        try {
            if (this.cachedPeerRole.isPresent() && this.cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.roleCheckLock.readLock().unlock();
        }
    }

    public boolean isLeader() {
        if (this.checkCachedPeerRoleIsLeader()) {
            return true;
        }
        this.updateServerRole();
        return this.checkCachedPeerRoleIsLeader();
    }

    public Optional<RaftPeerId> getCachedLeaderPeerId() {
        this.roleCheckLock.readLock().lock();
        try {
            Optional<RaftPeerId> optional = this.cachedLeaderPeerId;
            return optional;
        }
        finally {
            this.roleCheckLock.readLock().unlock();
        }
    }

    public void updateServerRole() {
        try {
            GroupInfoReply groupInfo = this.getGroupInfo();
            RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
            RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();
            if (thisNodeRole.equals((Object)RaftProtos.RaftPeerRole.LEADER)) {
                this.setServerRole(thisNodeRole, this.raftPeerId);
            } else if (thisNodeRole.equals((Object)RaftProtos.RaftPeerRole.FOLLOWER)) {
                ByteString leaderNodeId = roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getId();
                RaftPeerId leaderPeerId = null;
                if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
                    leaderPeerId = RaftPeerId.valueOf((ByteString)leaderNodeId);
                }
                this.setServerRole(thisNodeRole, leaderPeerId);
            } else {
                this.setServerRole(thisNodeRole, null);
            }
        }
        catch (IOException e) {
            LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to {} and resetting leader info.", (Object)RaftProtos.RaftPeerRole.UNRECOGNIZED, (Object)e);
            this.setServerRole(null, null);
        }
    }

    private void setServerRole(RaftProtos.RaftPeerRole currentRole, RaftPeerId leaderPeerId) {
        this.roleCheckLock.writeLock().lock();
        try {
            this.cachedPeerRole = Optional.ofNullable(currentRole);
            this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
        }
        finally {
            this.roleCheckLock.writeLock().unlock();
        }
    }

    private GroupInfoReply getGroupInfo() throws IOException {
        GroupInfoRequest groupInfoRequest = new GroupInfoRequest(this.clientId, this.raftPeerId, this.raftGroupId, OzoneManagerRatisServer.nextCallId());
        GroupInfoReply groupInfo = this.server.getGroupInfo(groupInfoRequest);
        return groupInfo;
    }

    public int getServerPort() {
        return this.port;
    }

    @VisibleForTesting
    public LifeCycle.State getServerState() {
        return this.server.getLifeCycleState();
    }

    @VisibleForTesting
    public RaftPeerId getRaftPeerId() {
        return this.raftPeerId;
    }

    private UUID getRaftGroupIdFromOmServiceId(String omServiceId) {
        return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8));
    }

    public long getStateMachineLastAppliedIndex() {
        return this.omStateMachine.getLastAppliedIndex();
    }
}

