/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.client.RecordingSignalPoller;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.RecordingSignal;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.ClusterControl;
import io.aeron.cluster.ClusterMember;
import io.aeron.cluster.ClusterSession;
import io.aeron.cluster.ClusterSessionProxy;
import io.aeron.cluster.ClusterTermination;
import io.aeron.cluster.ConsensusAdapter;
import io.aeron.cluster.ConsensusModule;
import io.aeron.cluster.ConsensusModuleAdapter;
import io.aeron.cluster.ConsensusModuleSnapshotAdapter;
import io.aeron.cluster.ConsensusModuleSnapshotListener;
import io.aeron.cluster.ConsensusModuleSnapshotTaker;
import io.aeron.cluster.ConsensusPublisher;
import io.aeron.cluster.DynamicJoin;
import io.aeron.cluster.EgressPublisher;
import io.aeron.cluster.Election;
import io.aeron.cluster.IngressAdapter;
import io.aeron.cluster.LogAdapter;
import io.aeron.cluster.LogPublisher;
import io.aeron.cluster.LogReplay;
import io.aeron.cluster.LogReplication;
import io.aeron.cluster.PendingServiceMessageTracker;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.ServiceAck;
import io.aeron.cluster.ServiceProxy;
import io.aeron.cluster.TimerService;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.client.ClusterEvent;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.AdminRequestType;
import io.aeron.cluster.codecs.AdminResponseCode;
import io.aeron.cluster.codecs.ChangeType;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.ClusterAction;
import io.aeron.cluster.codecs.EventCode;
import io.aeron.cluster.codecs.SnapshotRecordingsDecoder;
import io.aeron.cluster.service.Cluster;
import io.aeron.cluster.service.ClusterClock;
import io.aeron.cluster.service.ClusterMarkFile;
import io.aeron.cluster.service.ClusterTerminationException;
import io.aeron.cluster.service.ClusteredServiceContainer;
import io.aeron.cluster.service.RecoveryState;
import io.aeron.driver.DefaultNameResolver;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.driver.NameResolver;
import io.aeron.driver.media.UdpChannel;
import io.aeron.exceptions.AeronException;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.security.Authenticator;
import io.aeron.security.AuthorisationService;
import io.aeron.status.LocalSocketAddressStatus;
import io.aeron.status.ReadableCounter;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.LongConsumer;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.MutableDirectBuffer;
import org.agrona.SemanticVersion;
import org.agrona.collections.ArrayListUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongCounterMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.LongArrayQueue;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersReader;

final class ConsensusModuleAgent
implements Agent,
TimerService.TimerHandler,
ConsensusModuleSnapshotListener {
    static final long SLOW_TICK_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(10L);
    static final short APPEND_POSITION_FLAG_NONE = 0;
    static final short APPEND_POSITION_FLAG_CATCHUP = 1;
    private final long sessionTimeoutNs;
    private final long leaderHeartbeatIntervalNs;
    private final long leaderHeartbeatTimeoutNs;
    private long unavailableCounterHandlerRegistrationId;
    private long nextSessionId = 1L;
    private long leadershipTermId = -1L;
    private long expectedAckPosition = 0L;
    private long serviceAckId = 0L;
    private long terminationPosition = -1L;
    private long notifiedCommitPosition = 0L;
    private long lastAppendPosition = -1L;
    private long timeOfLastLogUpdateNs = 0L;
    private long timeOfLastAppendPositionUpdateNs = 0L;
    private long timeOfLastAppendPositionSendNs = 0L;
    private long slowTickDeadlineNs = 0L;
    private long markFileUpdateDeadlineNs = 0L;
    private int memberId;
    private int highMemberId;
    private int pendingMemberRemovals = 0;
    private long logPublicationChannelTag;
    private ReadableCounter appendPosition = null;
    private final Counter commitPosition;
    private ConsensusModule.State state = ConsensusModule.State.INIT;
    private Cluster.Role role = Cluster.Role.FOLLOWER;
    private ClusterMember[] activeMembers;
    private ClusterMember[] passiveMembers = ClusterMember.EMPTY_MEMBERS;
    private ClusterMember leaderMember;
    private ClusterMember thisMember;
    private long[] rankedPositions;
    private final long[] serviceClientIds;
    private final ArrayDeque<ServiceAck>[] serviceAckQueues;
    private final Counter clusterRoleCounter;
    private final ClusterMarkFile markFile;
    private final AgentInvoker aeronClientInvoker;
    private final ClusterClock clusterClock;
    private final LongConsumer clusterTimeConsumer;
    private final TimeUnit clusterTimeUnit;
    private final TimerService timerService;
    private final Counter moduleState;
    private final Counter controlToggle;
    private final ConsensusModuleAdapter consensusModuleAdapter;
    private final ServiceProxy serviceProxy;
    private final IngressAdapter ingressAdapter;
    private final EgressPublisher egressPublisher;
    private final LogPublisher logPublisher;
    private final LogAdapter logAdapter;
    private final ConsensusAdapter consensusAdapter;
    private final ConsensusPublisher consensusPublisher = new ConsensusPublisher();
    private final Long2ObjectHashMap<ClusterSession> sessionByIdMap = new Long2ObjectHashMap();
    private final ArrayList<ClusterSession> sessions = new ArrayList();
    private final ArrayList<ClusterSession> pendingSessions = new ArrayList();
    private final ArrayList<ClusterSession> rejectedSessions = new ArrayList();
    private final ArrayList<ClusterSession> redirectSessions = new ArrayList();
    private final Int2ObjectHashMap<ClusterMember> clusterMemberByIdMap = new Int2ObjectHashMap();
    private final Long2LongCounterMap expiredTimerCountByCorrelationIdMap = new Long2LongCounterMap(0L);
    private final ArrayDeque<ClusterSession> uncommittedClosedSessions = new ArrayDeque();
    private final LongArrayQueue uncommittedTimers = new LongArrayQueue(Long.MAX_VALUE);
    private final PendingServiceMessageTracker[] pendingServiceMessageTrackers;
    private final Authenticator authenticator;
    private final AuthorisationService authorisationService;
    private final ClusterSessionProxy sessionProxy;
    private final Aeron aeron;
    private final ConsensusModule.Context ctx;
    private final IdleStrategy idleStrategy;
    private final RecordingLog recordingLog;
    private final ArrayList<RecordingLog.Snapshot> dynamicJoinSnapshots = new ArrayList();
    private final DutyCycleTracker dutyCycleTracker;
    private RecordingLog.RecoveryPlan recoveryPlan;
    private AeronArchive archive;
    private RecordingSignalPoller recordingSignalPoller;
    private Election election;
    private DynamicJoin dynamicJoin;
    private ClusterTermination clusterTermination;
    private long logSubscriptionId = -1L;
    private long logRecordingId = -1L;
    private long logRecordedPosition = -1L;
    private String liveLogDestination;
    private String catchupLogDestination;
    private String ingressEndpoints;
    private boolean isElectionRequired;

    ConsensusModuleAgent(ConsensusModule.Context ctx) {
        this.ctx = ctx;
        this.aeron = ctx.aeron();
        this.clusterClock = ctx.clusterClock();
        this.clusterTimeUnit = this.clusterClock.timeUnit();
        this.clusterTimeConsumer = ctx.clusterTimeConsumerSupplier().apply(ctx);
        this.timerService = ctx.timerServiceSupplier().newInstance(this.clusterTimeUnit, this);
        this.sessionTimeoutNs = ctx.sessionTimeoutNs();
        this.leaderHeartbeatIntervalNs = ctx.leaderHeartbeatIntervalNs();
        this.leaderHeartbeatTimeoutNs = ctx.leaderHeartbeatTimeoutNs();
        this.egressPublisher = ctx.egressPublisher();
        this.moduleState = ctx.moduleStateCounter();
        this.commitPosition = ctx.commitPositionCounter();
        this.controlToggle = ctx.controlToggleCounter();
        this.logPublisher = ctx.logPublisher();
        this.idleStrategy = ctx.idleStrategy();
        this.activeMembers = ClusterMember.parse(ctx.clusterMembers());
        this.sessionProxy = new ClusterSessionProxy(this.egressPublisher);
        this.memberId = ctx.clusterMemberId();
        this.clusterRoleCounter = ctx.clusterNodeRoleCounter();
        this.markFile = ctx.clusterMarkFile();
        this.recordingLog = ctx.recordingLog();
        this.serviceClientIds = new long[ctx.serviceCount()];
        Arrays.fill(this.serviceClientIds, -1L);
        this.serviceAckQueues = ServiceAck.newArrayOfQueues(ctx.serviceCount());
        this.highMemberId = ClusterMember.highMemberId(this.activeMembers);
        this.dutyCycleTracker = ctx.dutyCycleTracker();
        this.aeronClientInvoker = this.aeron.conductorAgentInvoker();
        this.aeronClientInvoker.invoke();
        this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
        this.role(Cluster.Role.FOLLOWER);
        ClusterMember.addClusterMemberIds(this.activeMembers, this.clusterMemberByIdMap);
        this.leaderMember = this.thisMember = ClusterMember.determineMember(this.activeMembers, ctx.clusterMemberId(), ctx.memberEndpoints());
        ChannelUri consensusUri = ChannelUri.parse((CharSequence)ctx.consensusChannel());
        if (!consensusUri.containsKey("endpoint")) {
            consensusUri.put("endpoint", this.thisMember.consensusEndpoint());
        }
        this.consensusAdapter = new ConsensusAdapter(this.aeron.addSubscription(consensusUri.toString(), ctx.consensusStreamId()), this);
        this.ingressAdapter = new IngressAdapter(ctx.ingressFragmentLimit(), this);
        this.logAdapter = new LogAdapter(this, ctx.logFragmentLimit());
        this.consensusModuleAdapter = new ConsensusModuleAdapter(this.aeron.addSubscription(ctx.controlChannel(), ctx.consensusModuleStreamId()), this);
        this.serviceProxy = new ServiceProxy((Publication)this.aeron.addPublication(ctx.controlChannel(), ctx.serviceStreamId()));
        this.authenticator = (Authenticator)ctx.authenticatorSupplier().get();
        this.authorisationService = (AuthorisationService)ctx.authorisationServiceSupplier().get();
        this.pendingServiceMessageTrackers = new PendingServiceMessageTracker[ctx.serviceCount()];
        int size = ctx.serviceCount();
        for (int i = 0; i < size; ++i) {
            this.pendingServiceMessageTrackers[i] = new PendingServiceMessageTracker(i, this.commitPosition, this.logPublisher, this.clusterClock);
        }
    }

    public void onClose() {
        if (!this.aeron.isClosed()) {
            this.aeron.removeUnavailableCounterHandler(this.unavailableCounterHandlerRegistrationId);
            this.tryStopLogRecording();
            if (!this.ctx.ownsAeronClient()) {
                this.logPublisher.disconnect((ErrorHandler)this.ctx.countedErrorHandler());
                this.logAdapter.disconnect((ErrorHandler)this.ctx.countedErrorHandler());
                CountedErrorHandler errorHandler = this.ctx.countedErrorHandler();
                for (ClusterSession session : this.sessionByIdMap.values()) {
                    session.close(this.aeron, (ErrorHandler)errorHandler);
                }
                CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.ingressAdapter);
                ClusterMember.closeConsensusPublications((ErrorHandler)errorHandler, this.activeMembers);
                CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.consensusAdapter);
                CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.serviceProxy);
                CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.consensusModuleAdapter);
                CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.archive);
            }
            this.state(ConsensusModule.State.CLOSED);
        }
        this.markFile.updateActivityTimestamp(-1L);
        this.ctx.close();
    }

    public void onStart() {
        this.archive = AeronArchive.connect((AeronArchive.Context)this.ctx.archiveContext().clone());
        this.recordingSignalPoller = new RecordingSignalPoller(this.archive.controlSessionId(), this.archive.controlResponsePoller().subscription());
        this.dynamicJoin = this.requiresDynamicJoin();
        if (null == this.dynamicJoin) {
            long lastTermRecordingId = this.recordingLog.findLastTermRecordingId();
            if (-1L != lastTermRecordingId) {
                this.archive.tryStopRecordingByIdentity(lastTermRecordingId);
            }
            this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount(), this.logRecordingId);
            if (null != this.recoveryPlan.log) {
                this.logRecordingId(this.recoveryPlan.log.recordingId);
            }
            try (Counter ignore = this.addRecoveryStateCounter(this.recoveryPlan);){
                if (!this.recoveryPlan.snapshots.isEmpty()) {
                    this.loadSnapshot(this.recoveryPlan.snapshots.get(0), this.archive);
                }
                while (!ServiceAck.hasReached(this.expectedAckPosition, this.serviceAckId, this.serviceAckQueues)) {
                    this.idle(this.consensusModuleAdapter.poll());
                }
                this.captureServiceClientIds();
                ++this.serviceAckId;
            }
            ClusterMember.addConsensusPublications(this.activeMembers, this.thisMember, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
            this.election = new Election(true, this.recoveryPlan.lastLeadershipTermId, this.recoveryPlan.lastTermBaseLogPosition, this.commitPosition.getWeak(), this.recoveryPlan.appendedLogPosition, this.activeMembers, this.clusterMemberByIdMap, this.thisMember, this.consensusPublisher, this.ctx, this);
            this.election.doWork(this.clusterClock.timeNanos());
            this.state(ConsensusModule.State.ACTIVE);
        }
        this.unavailableCounterHandlerRegistrationId = this.aeron.addUnavailableCounterHandler(this::onUnavailableCounter);
        this.dutyCycleTracker.update(this.clusterClock.timeNanos());
    }

    public int doWork() {
        long timestamp = this.clusterClock.time();
        long nowNs = this.clusterTimeUnit.toNanos(timestamp);
        int workCount = 0;
        this.dutyCycleTracker.measureAndUpdate(nowNs);
        try {
            if (nowNs >= this.slowTickDeadlineNs) {
                this.slowTickDeadlineNs = nowNs + SLOW_TICK_INTERVAL_NS;
                workCount += this.slowTickWork(nowNs);
            }
            workCount += this.consensusAdapter.poll();
            workCount = null != this.dynamicJoin ? (workCount += this.dynamicJoin.doWork(nowNs)) : (null != this.election ? (workCount += this.election.doWork(nowNs)) : (workCount += this.consensusWork(timestamp, nowNs)));
        }
        catch (AgentTerminationException ex) {
            this.runTerminationHook();
            throw ex;
        }
        catch (Exception ex) {
            if (null != this.election) {
                this.election.handleError(nowNs, ex);
            }
            throw ex;
        }
        this.clusterTimeConsumer.accept(timestamp);
        return workCount;
    }

    public String roleName() {
        String agentRoleName = this.ctx.agentRoleName();
        return null != agentRoleName ? agentRoleName : "consensus-module_" + this.ctx.clusterId() + "_" + this.memberId;
    }

    @Override
    public void onLoadBeginSnapshot(int appVersion, TimeUnit timeUnit, DirectBuffer buffer, int offset, int length) {
        if (!this.ctx.appVersionValidator().isVersionCompatible(this.ctx.appVersion(), appVersion)) {
            throw new ClusterException("incompatible version: " + SemanticVersion.toString((int)this.ctx.appVersion()) + " snapshot=" + SemanticVersion.toString((int)appVersion), AeronException.Category.FATAL);
        }
        if (timeUnit != this.clusterTimeUnit) {
            throw new ClusterException("incompatible time unit: " + (Object)((Object)this.clusterTimeUnit) + " snapshot=" + (Object)((Object)timeUnit), AeronException.Category.FATAL);
        }
    }

    @Override
    public void onLoadEndSnapshot(DirectBuffer buffer, int offset, int length) {
    }

    @Override
    public void onLoadClusterSession(long clusterSessionId, long correlationId, long openedPosition, long timeOfLastActivity, CloseReason closeReason, int responseStreamId, String responseChannel, DirectBuffer buffer, int offset, int length) {
        ClusterSession session = new ClusterSession(clusterSessionId, correlationId, openedPosition, timeOfLastActivity, responseStreamId, this.refineResponseChannel(responseChannel), closeReason);
        this.addSession(session);
        if (clusterSessionId >= this.nextSessionId) {
            this.nextSessionId = clusterSessionId + 1L;
        }
    }

    @Override
    public void onLoadConsensusModuleState(long nextSessionId, long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, DirectBuffer buffer, int offset, int length) {
        this.nextSessionId = nextSessionId;
        this.pendingServiceMessageTrackers[0].loadState(nextServiceSessionId, logServiceSessionId, pendingMessageCapacity);
    }

    @Override
    public void onLoadClusterMembers(int memberId, int highMemberId, String members, DirectBuffer buffer, int offset, int length) {
        if (null == this.dynamicJoin && !this.ctx.clusterMembersIgnoreSnapshot()) {
            if (-1 == this.memberId) {
                this.memberId = memberId;
                this.ctx.clusterMarkFile().memberId(memberId);
            }
            if (ClusterMember.EMPTY_MEMBERS == this.activeMembers) {
                this.activeMembers = ClusterMember.parse(members);
                this.highMemberId = Math.max(ClusterMember.highMemberId(this.activeMembers), highMemberId);
                this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
                this.thisMember = (ClusterMember)this.clusterMemberByIdMap.get(memberId);
                ClusterMember.addConsensusPublications(this.activeMembers, this.thisMember, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
            }
        }
    }

    @Override
    public void onLoadPendingMessageTracker(long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, int serviceId, DirectBuffer buffer, int offset, int length) {
        if (serviceId < 0 || serviceId >= this.pendingServiceMessageTrackers.length) {
            throw new ClusterException("serviceId=" + serviceId + " invalid for serviceCount=" + this.pendingServiceMessageTrackers.length);
        }
        this.pendingServiceMessageTrackers[serviceId].loadState(nextServiceSessionId, logServiceSessionId, pendingMessageCapacity);
    }

    @Override
    public void onLoadPendingMessage(long clusterSessionId, DirectBuffer buffer, int offset, int length) {
        int index = PendingServiceMessageTracker.serviceId(clusterSessionId);
        this.pendingServiceMessageTrackers[index].appendMessage(buffer, offset, length);
    }

    @Override
    public void onLoadTimer(long correlationId, long deadline, DirectBuffer buffer, int offset, int length) {
        this.onScheduleTimer(correlationId, deadline);
    }

    void onSessionConnect(long correlationId, int responseStreamId, int version, String responseChannel, byte[] encodedCredentials) {
        long l;
        if (Cluster.Role.LEADER == this.role) {
            long l2 = this.nextSessionId;
            l = l2;
            this.nextSessionId = l2 + 1L;
        } else {
            l = -1L;
        }
        long clusterSessionId = l;
        ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, this.refineResponseChannel(responseChannel));
        session.asyncConnect(this.aeron);
        long now = this.clusterClock.time();
        session.lastActivityNs(this.clusterTimeUnit.toNanos(now), correlationId);
        if (Cluster.Role.LEADER != this.role) {
            this.redirectSessions.add(session);
        } else if (0 != SemanticVersion.major((int)version)) {
            String detail = "invalid client version " + SemanticVersion.toString((int)version) + ", cluster is " + SemanticVersion.toString((int)AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION);
            session.reject(EventCode.ERROR, detail);
            this.rejectedSessions.add(session);
        } else if (this.pendingSessions.size() + this.sessions.size() >= this.ctx.maxConcurrentSessions()) {
            session.reject(EventCode.ERROR, "concurrent session limit");
            this.rejectedSessions.add(session);
        } else {
            this.authenticator.onConnectRequest(session.id(), encodedCredentials, this.clusterTimeUnit.toMillis(now));
            this.pendingSessions.add(session);
        }
    }

    void onSessionClose(long leadershipTermId, long clusterSessionId) {
        ClusterSession session;
        if (leadershipTermId == this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (session = (ClusterSession)this.sessionByIdMap.get(clusterSessionId)) && session.state() == ClusterSession.State.OPEN) {
            session.closing(CloseReason.CLIENT_ACTION);
            session.disconnect(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
            if (this.logPublisher.appendSessionClose(this.memberId, session, leadershipTermId, this.clusterClock.time(), this.clusterClock.timeUnit())) {
                session.closedLogPosition(this.logPublisher.position());
                this.uncommittedClosedSessions.addLast(session);
                this.sessionByIdMap.remove(clusterSessionId);
                ConsensusModuleAgent.removeSession(this.sessions, clusterSessionId);
                session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
            }
        }
    }

    void onAdminRequest(long leadershipTermId, long clusterSessionId, long correlationId, AdminRequestType requestType, DirectBuffer payload, int payloadOffset, int payloadLength) {
        if (Cluster.Role.LEADER != this.role) {
            return;
        }
        ClusterSession session = (ClusterSession)this.sessionByIdMap.get(clusterSessionId);
        if (null == session || session.state() != ClusterSession.State.OPEN) {
            return;
        }
        if (leadershipTermId != this.leadershipTermId) {
            String msg = "Invalid leadership term: expected " + this.leadershipTermId + ", got " + leadershipTermId;
            this.egressPublisher.sendAdminResponse(session, correlationId, requestType, AdminResponseCode.ERROR, msg);
            return;
        }
        if (!this.authorisationService.isAuthorised(111, 26, (Object)requestType, session.encodedPrincipal())) {
            String msg = "Execution of the " + (Object)((Object)requestType) + " request was not authorised";
            this.egressPublisher.sendAdminResponse(session, correlationId, requestType, AdminResponseCode.UNAUTHORISED_ACCESS, msg);
            return;
        }
        if (AdminRequestType.SNAPSHOT == requestType) {
            if (ClusterControl.ToggleState.SNAPSHOT.toggle((AtomicCounter)this.controlToggle)) {
                this.egressPublisher.sendAdminResponse(session, correlationId, requestType, AdminResponseCode.OK, "");
            } else {
                String msg = "Failed to switch ClusterControl to the ToggleState.SNAPSHOT state";
                this.egressPublisher.sendAdminResponse(session, correlationId, requestType, AdminResponseCode.ERROR, "Failed to switch ClusterControl to the ToggleState.SNAPSHOT state");
            }
        } else {
            this.egressPublisher.sendAdminResponse(session, correlationId, requestType, AdminResponseCode.ERROR, "Unknown request type: " + (Object)((Object)requestType));
        }
    }

    ControlledFragmentHandler.Action onIngressMessage(long leadershipTermId, long clusterSessionId, DirectBuffer buffer, int offset, int length) {
        ClusterSession session;
        if (leadershipTermId == this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (session = (ClusterSession)this.sessionByIdMap.get(clusterSessionId)) && session.state() == ClusterSession.State.OPEN) {
            long timestamp = this.clusterClock.time();
            if (this.logPublisher.appendMessage(leadershipTermId, clusterSessionId, timestamp, buffer, offset, length) > 0L) {
                session.timeOfLastActivityNs(this.clusterTimeUnit.toNanos(timestamp));
                return ControlledFragmentHandler.Action.CONTINUE;
            }
            return ControlledFragmentHandler.Action.ABORT;
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    void onSessionKeepAlive(long leadershipTermId, long clusterSessionId) {
        ClusterSession session;
        if (leadershipTermId == this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (session = (ClusterSession)this.sessionByIdMap.get(clusterSessionId)) && session.state() == ClusterSession.State.OPEN) {
            session.timeOfLastActivityNs(this.clusterClock.timeNanos());
        }
    }

    void onChallengeResponse(long correlationId, long clusterSessionId, byte[] encodedCredentials) {
        if (Cluster.Role.LEADER == this.role) {
            int lastIndex;
            for (int i = lastIndex = this.pendingSessions.size() - 1; i >= 0; --i) {
                ClusterSession session = this.pendingSessions.get(i);
                if (session.id() != clusterSessionId || session.state() != ClusterSession.State.CHALLENGED) continue;
                long timestamp = this.clusterClock.time();
                long nowMs = this.clusterTimeUnit.toMillis(timestamp);
                session.lastActivityNs(this.clusterTimeUnit.toNanos(timestamp), correlationId);
                this.authenticator.onChallengeResponse(clusterSessionId, encodedCredentials, nowMs);
                break;
            }
        } else {
            this.consensusPublisher.challengeResponse(this.leaderMember.publication(), correlationId, clusterSessionId, encodedCredentials);
        }
    }

    @Override
    public boolean onTimerEvent(long correlationId) {
        long appendPosition = this.logPublisher.appendTimer(correlationId, this.leadershipTermId, this.clusterClock.time());
        if (appendPosition > 0L) {
            this.uncommittedTimers.offerLong(appendPosition);
            this.uncommittedTimers.offerLong(correlationId);
            return true;
        }
        return false;
    }

    void onCanvassPosition(long logLeadershipTermId, long logPosition, long leadershipTermId, int followerMemberId, int protocolVersion) {
        ClusterMember follower;
        this.checkFollowerForConsensusPublication(followerMemberId);
        if (null != this.election) {
            this.election.onCanvassPosition(logLeadershipTermId, logPosition, leadershipTermId, followerMemberId, protocolVersion);
        } else if (Cluster.Role.LEADER == this.role && null != (follower = (ClusterMember)this.clusterMemberByIdMap.get(followerMemberId)) && logLeadershipTermId <= this.leadershipTermId) {
            this.stopExistingCatchupReplay(follower);
            RecordingLog.Entry currentTermEntry = this.recordingLog.getTermEntry(this.leadershipTermId);
            long termBaseLogPosition = currentTermEntry.termBaseLogPosition;
            long nextLogLeadershipTermId = -1L;
            long nextTermBaseLogPosition = -1L;
            long nextLogPosition = -1L;
            if (logLeadershipTermId < this.leadershipTermId) {
                RecordingLog.Entry nextLogEntry = this.recordingLog.findTermEntry(logLeadershipTermId + 1L);
                nextLogLeadershipTermId = null != nextLogEntry ? nextLogEntry.leadershipTermId : this.leadershipTermId;
                nextTermBaseLogPosition = null != nextLogEntry ? nextLogEntry.termBaseLogPosition : termBaseLogPosition;
                nextLogPosition = null != nextLogEntry ? nextLogEntry.logPosition : -1L;
            }
            this.consensusPublisher.newLeadershipTerm(follower.publication(), logLeadershipTermId, nextLogLeadershipTermId, nextTermBaseLogPosition, nextLogPosition, this.leadershipTermId, termBaseLogPosition, this.logPublisher.position(), this.logRecordingId, this.clusterClock.time(), this.memberId, this.logPublisher.sessionId(), false);
        }
    }

    void onRequestVote(long logLeadershipTermId, long logPosition, long candidateTermId, int candidateId, int protocolVersion) {
        if (null != this.election) {
            this.election.onRequestVote(logLeadershipTermId, logPosition, candidateTermId, candidateId, protocolVersion);
        } else if (candidateTermId > this.leadershipTermId && null == this.dynamicJoin) {
            this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("unexpected vote request")));
            this.enterElection();
        }
    }

    void onVote(long candidateTermId, long logLeadershipTermId, long logPosition, int candidateMemberId, int followerMemberId, boolean vote) {
        if (null != this.election) {
            this.election.onVote(candidateTermId, logLeadershipTermId, logPosition, candidateMemberId, followerMemberId, vote);
        }
    }

    void onNewLeadershipTerm(long logLeadershipTermId, long nextLeadershipTermId, long nextTermBaseLogPosition, long nextLogPosition, long leadershipTermId, long termBaseLogPosition, long logPosition, long leaderRecordingId, long timestamp, int leaderId, int logSessionId, int appVersion, boolean isStartup) {
        this.logNewLeadershipTerm(logLeadershipTermId, nextLeadershipTermId, nextTermBaseLogPosition, nextLogPosition, leadershipTermId, termBaseLogPosition, logPosition, leaderRecordingId, timestamp, this.memberId, leaderId, logSessionId, appVersion, isStartup);
        if (!this.ctx.appVersionValidator().isVersionCompatible(this.ctx.appVersion(), appVersion)) {
            this.ctx.errorHandler().onError((Throwable)((Object)new ClusterException("incompatible version: " + SemanticVersion.toString((int)this.ctx.appVersion()) + " log=" + SemanticVersion.toString((int)appVersion))));
            throw new AgentTerminationException();
        }
        if (null != this.election) {
            this.election.onNewLeadershipTerm(logLeadershipTermId, nextLeadershipTermId, nextTermBaseLogPosition, nextLogPosition, leadershipTermId, termBaseLogPosition, logPosition, leaderRecordingId, timestamp, leaderId, logSessionId, isStartup);
        } else if (Cluster.Role.FOLLOWER == this.role && leadershipTermId == this.leadershipTermId && leaderId == this.leaderMember.id()) {
            this.notifiedCommitPosition = Math.max(this.notifiedCommitPosition, logPosition);
            this.timeOfLastLogUpdateNs = this.clusterClock.timeNanos();
        } else if (leadershipTermId > this.leadershipTermId && null == this.dynamicJoin) {
            this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("unexpected new leadership term event")));
            this.enterElection();
        }
    }

    void onAppendPosition(long leadershipTermId, long logPosition, int followerMemberId, short flags) {
        ClusterMember follower;
        if (null != this.election) {
            this.election.onAppendPosition(leadershipTermId, logPosition, followerMemberId, flags);
        } else if (leadershipTermId <= this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (follower = (ClusterMember)this.clusterMemberByIdMap.get(followerMemberId))) {
            follower.logPosition(logPosition).timeOfLastAppendPositionNs(this.clusterClock.timeNanos());
            this.trackCatchupCompletion(follower, leadershipTermId, flags);
        }
    }

    void onCommitPosition(long leadershipTermId, long logPosition, int leaderMemberId) {
        this.logCommitPosition(leadershipTermId, logPosition, leaderMemberId, this.memberId);
        if (null != this.election) {
            this.election.onCommitPosition(leadershipTermId, logPosition, leaderMemberId);
        } else if (leadershipTermId == this.leadershipTermId && leaderMemberId == this.leaderMember.id() && Cluster.Role.FOLLOWER == this.role) {
            this.notifiedCommitPosition = logPosition;
            this.timeOfLastLogUpdateNs = this.clusterClock.timeNanos();
        } else if (leadershipTermId > this.leadershipTermId && null == this.dynamicJoin) {
            this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("unexpected commit position from new leader")));
            this.enterElection();
        }
    }

    void onCatchupPosition(long leadershipTermId, long logPosition, int followerMemberId, String catchupEndpoint) {
        ClusterMember follower;
        if (leadershipTermId <= this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (follower = (ClusterMember)this.clusterMemberByIdMap.get(followerMemberId)) && follower.catchupReplaySessionId() == -1L) {
            ChannelUri channel = ChannelUri.parse((CharSequence)this.ctx.followerCatchupChannel());
            channel.put("endpoint", catchupEndpoint);
            channel.put("session-id", Integer.toString(this.logPublisher.sessionId()));
            channel.put("linger", "0");
            channel.put("eos", "false");
            follower.catchupReplaySessionId(this.archive.startReplay(this.logRecordingId, logPosition, Long.MAX_VALUE, channel.toString(), this.ctx.logStreamId()));
            follower.catchupReplayCorrelationId(this.archive.lastCorrelationId());
        }
    }

    void onStopCatchup(long leadershipTermId, int followerMemberId) {
        if (leadershipTermId == this.leadershipTermId && followerMemberId == this.memberId && null != this.catchupLogDestination) {
            this.logAdapter.removeDestination(this.catchupLogDestination);
            this.catchupLogDestination = null;
        }
    }

    void onAddPassiveMember(long correlationId, String memberEndpoints) {
        this.logAddPassiveMember(correlationId, memberEndpoints, this.memberId);
        if (null == this.election && null == this.dynamicJoin) {
            if (Cluster.Role.LEADER == this.role) {
                if (ClusterMember.notDuplicateEndpoint(this.passiveMembers, memberEndpoints) && ClusterMember.notDuplicateEndpoint(this.activeMembers, memberEndpoints)) {
                    ClusterMember newMember = ClusterMember.parseEndpoints(++this.highMemberId, memberEndpoints);
                    newMember.correlationId(correlationId);
                    this.passiveMembers = ClusterMember.addMember(this.passiveMembers, newMember);
                    this.clusterMemberByIdMap.put(newMember.id(), (Object)newMember);
                    ClusterMember.addConsensusPublication(newMember, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                    this.logPublisher.addDestination(this.ctx.isLogMdc(), newMember.logEndpoint());
                }
            } else if (Cluster.Role.FOLLOWER == this.role) {
                this.consensusPublisher.addPassiveMember(this.leaderMember.publication(), correlationId, memberEndpoints);
            }
        }
    }

    void onClusterMembersChange(long correlationId, int leaderMemberId, String activeMembers, String passiveMembers) {
        if (null != this.dynamicJoin) {
            this.dynamicJoin.onClusterMembersChange(correlationId, leaderMemberId, activeMembers, passiveMembers);
        }
    }

    void onSnapshotRecordingQuery(long correlationId, int requestMemberId) {
        ClusterMember requester;
        if (null == this.election && Cluster.Role.LEADER == this.role && null != (requester = (ClusterMember)this.clusterMemberByIdMap.get(requestMemberId))) {
            this.consensusPublisher.snapshotRecording(requester.publication(), correlationId, this.recoveryPlan, ClusterMember.encodeAsString(this.activeMembers));
        }
    }

    void onSnapshotRecordings(long correlationId, SnapshotRecordingsDecoder decoder) {
        if (null != this.dynamicJoin) {
            this.dynamicJoin.onSnapshotRecordings(correlationId, decoder);
        }
    }

    void onJoinCluster(long leadershipTermId, int memberId) {
        if (null == this.election && Cluster.Role.LEADER == this.role) {
            long snapshotLeadershipTermId;
            ClusterMember member = (ClusterMember)this.clusterMemberByIdMap.get(memberId);
            long l = snapshotLeadershipTermId = this.recoveryPlan.snapshots.isEmpty() ? -1L : this.recoveryPlan.snapshots.get((int)0).leadershipTermId;
            if (null != member && !member.hasRequestedJoin() && leadershipTermId <= snapshotLeadershipTermId) {
                if (null == member.publication()) {
                    ClusterMember.addConsensusPublication(member, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                    this.logPublisher.addDestination(this.ctx.isLogMdc(), member.logEndpoint());
                }
                member.hasRequestedJoin(true);
            }
        }
    }

    void onTerminationPosition(long leadershipTermId, long logPosition) {
        if (leadershipTermId == this.leadershipTermId && Cluster.Role.FOLLOWER == this.role) {
            this.terminationPosition = logPosition;
            this.timeOfLastLogUpdateNs = this.clusterClock.timeNanos();
        }
    }

    void onTerminationAck(long leadershipTermId, long logPosition, int memberId) {
        ClusterMember member;
        if (leadershipTermId == this.leadershipTermId && logPosition >= this.terminationPosition && Cluster.Role.LEADER == this.role && null != (member = (ClusterMember)this.clusterMemberByIdMap.get(memberId))) {
            member.hasTerminated(true);
            if (this.clusterTermination.canTerminate(this.activeMembers, this.terminationPosition, this.clusterClock.timeNanos())) {
                this.recordingLog.commitLogPosition(leadershipTermId, this.terminationPosition);
                this.closeAndTerminate();
            }
        }
    }

    void onBackupQuery(long correlationId, int responseStreamId, int version, String responseChannel, byte[] encodedCredentials) {
        if (null == this.election && null == this.dynamicJoin) {
            if (Cluster.Role.LEADER != this.role) {
                this.consensusPublisher.backupQuery(this.leaderMember.publication(), correlationId, responseStreamId, version, responseChannel, encodedCredentials);
            } else if (this.state == ConsensusModule.State.ACTIVE || this.state == ConsensusModule.State.SUSPENDED) {
                ClusterSession session = new ClusterSession(-1L, responseStreamId, this.refineResponseChannel(responseChannel));
                session.action(ClusterSession.Action.BACKUP);
                session.asyncConnect(this.aeron);
                long timestamp = this.clusterClock.time();
                session.lastActivityNs(this.clusterTimeUnit.toNanos(timestamp), correlationId);
                if (0 != SemanticVersion.major((int)version)) {
                    String detail = "invalid client version " + SemanticVersion.toString((int)version) + ", cluster=" + SemanticVersion.toString((int)AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION);
                    session.reject(EventCode.ERROR, detail);
                    this.rejectedSessions.add(session);
                } else {
                    long timestampMs = this.clusterTimeUnit.toMillis(timestamp);
                    this.authenticator.onConnectRequest(session.id(), encodedCredentials, timestampMs);
                    this.pendingSessions.add(session);
                }
            }
        }
    }

    public void onHeartbeatRequest(long correlationId, int responseStreamId, String responseChannel, byte[] encodedCredentials) {
        if (null == this.election && null == this.dynamicJoin) {
            if (Cluster.Role.LEADER != this.role) {
                this.consensusPublisher.heartbeatRequest(this.leaderMember.publication(), correlationId, responseStreamId, responseChannel, encodedCredentials);
            } else if (this.state == ConsensusModule.State.ACTIVE || this.state == ConsensusModule.State.SUSPENDED) {
                ClusterSession session = new ClusterSession(-1L, responseStreamId, this.refineResponseChannel(responseChannel));
                session.action(ClusterSession.Action.HEARTBEAT);
                session.asyncConnect(this.aeron);
                long timestamp = this.clusterClock.time();
                long timestampNs = this.clusterTimeUnit.toNanos(timestamp);
                long timestampMs = this.clusterTimeUnit.toMillis(timestamp);
                session.lastActivityNs(timestampNs, correlationId);
                this.authenticator.onConnectRequest(session.id(), encodedCredentials, timestampMs);
                this.pendingSessions.add(session);
            }
        }
    }

    void onRemoveMember(int memberId, boolean isPassive) {
        ClusterMember member;
        if (null == this.election && Cluster.Role.LEADER == this.role && null != (member = (ClusterMember)this.clusterMemberByIdMap.get(memberId))) {
            if (isPassive) {
                this.passiveMembers = ClusterMember.removeMember(this.passiveMembers, memberId);
                member.closePublication((ErrorHandler)this.ctx.countedErrorHandler());
                this.logPublisher.removeDestination(this.ctx.isLogMdc(), member.logEndpoint());
                this.clusterMemberByIdMap.remove(memberId);
                this.clusterMemberByIdMap.compact();
            } else {
                long now = this.clusterClock.time();
                long position = this.logPublisher.appendMembershipChangeEvent(this.leadershipTermId, now, this.memberId, this.activeMembers.length, ChangeType.QUIT, memberId, ClusterMember.encodeAsString(ClusterMember.removeMember(this.activeMembers, memberId)));
                if (position > 0L) {
                    this.timeOfLastLogUpdateNs = this.clusterTimeUnit.toNanos(now) - this.leaderHeartbeatIntervalNs;
                    member.removalPosition(position);
                    ++this.pendingMemberRemovals;
                }
            }
        }
    }

    void onClusterMembersQuery(long correlationId, boolean isExtendedRequest) {
        if (isExtendedRequest) {
            this.serviceProxy.clusterMembersExtendedResponse(correlationId, this.clusterClock.timeNanos(), this.leaderMember.id(), this.memberId, this.activeMembers, this.passiveMembers);
        } else {
            this.serviceProxy.clusterMembersResponse(correlationId, this.leaderMember.id(), ClusterMember.encodeAsString(this.activeMembers), ClusterMember.encodeAsString(this.passiveMembers));
        }
    }

    void state(ConsensusModule.State newState) {
        if (newState != this.state) {
            this.logStateChange(this.state, newState, this.memberId);
            this.state = newState;
            if (!this.moduleState.isClosed()) {
                this.moduleState.set((long)newState.code());
            }
        }
    }

    ConsensusModule.State state() {
        return this.state;
    }

    private void logStateChange(ConsensusModule.State oldState, ConsensusModule.State newState, int memberId) {
    }

    void role(Cluster.Role newRole) {
        if (newRole != this.role) {
            this.logRoleChange(this.role, newRole, this.memberId);
            this.role = newRole;
            if (!this.clusterRoleCounter.isClosed()) {
                this.clusterRoleCounter.set((long)newRole.code());
            }
        }
    }

    private void logRoleChange(Cluster.Role oldRole, Cluster.Role newRole, int memberId) {
    }

    Cluster.Role role() {
        return this.role;
    }

    long prepareForNewLeadership(long logPosition, long nowNs) {
        this.role(Cluster.Role.FOLLOWER);
        CloseHelper.close((ErrorHandler)this.ctx.countedErrorHandler(), (AutoCloseable)this.ingressAdapter);
        ClusterControl.ToggleState.deactivate((AtomicCounter)this.controlToggle);
        if (null != this.catchupLogDestination) {
            this.logAdapter.removeDestination(this.catchupLogDestination);
            this.catchupLogDestination = null;
        }
        if (null != this.liveLogDestination) {
            this.logAdapter.removeDestination(this.liveLogDestination);
            this.liveLogDestination = null;
        }
        this.logAdapter.disconnect((ErrorHandler)this.ctx.countedErrorHandler());
        this.logPublisher.disconnect((ErrorHandler)this.ctx.countedErrorHandler());
        if (-1L != this.logRecordingId) {
            this.tryStopLogRecording();
            this.lastAppendPosition = this.getLastAppendedPosition();
            this.timeOfLastAppendPositionUpdateNs = nowNs;
            this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount(), this.logRecordingId);
            CountersReader counters = this.ctx.aeron().countersReader();
            while (-1 != RecordingPos.findCounterIdByRecording((CountersReader)counters, (long)this.logRecordingId)) {
                this.idle();
            }
            this.clearSessionsAfter(logPosition);
            int size = this.sessions.size();
            for (int i = 0; i < size; ++i) {
                this.sessions.get(i).disconnect(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
            }
            this.commitPosition.setOrdered(logPosition);
            this.restoreUncommittedEntries(logPosition);
        }
        return this.lastAppendPosition;
    }

    void onServiceCloseSession(long clusterSessionId) {
        ClusterSession session = (ClusterSession)this.sessionByIdMap.get(clusterSessionId);
        if (null != session) {
            session.closing(CloseReason.SERVICE_ACTION);
            if (Cluster.Role.LEADER == this.role && ConsensusModule.State.ACTIVE == this.state && this.logPublisher.appendSessionClose(this.memberId, session, this.leadershipTermId, this.clusterClock.time(), this.clusterClock.timeUnit())) {
                String msg = CloseReason.SERVICE_ACTION.name();
                this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, msg);
                session.closedLogPosition(this.logPublisher.position());
                this.uncommittedClosedSessions.addLast(session);
                this.sessionByIdMap.remove(clusterSessionId);
                ConsensusModuleAgent.removeSession(this.sessions, clusterSessionId);
                session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
            }
        }
    }

    void onServiceMessage(long clusterSessionId, DirectBuffer buffer, int offset, int length) {
        int i = PendingServiceMessageTracker.serviceId(clusterSessionId);
        this.pendingServiceMessageTrackers[i].enqueueMessage((MutableDirectBuffer)buffer, offset, length);
    }

    void onScheduleTimer(long correlationId, long deadline) {
        if (this.expiredTimerCountByCorrelationIdMap.get(correlationId) == 0L) {
            this.timerService.scheduleTimerForCorrelationId(correlationId, deadline);
        } else {
            this.expiredTimerCountByCorrelationIdMap.decrementAndGet(correlationId);
        }
    }

    void onCancelTimer(long correlationId) {
        this.timerService.cancelTimerByCorrelationId(correlationId);
    }

    void onServiceAck(long logPosition, long timestamp, long ackId, long relevantId, int serviceId) {
        this.captureServiceAck(logPosition, ackId, relevantId, serviceId);
        if (ServiceAck.hasReached(logPosition, this.serviceAckId, this.serviceAckQueues)) {
            if (ConsensusModule.State.SNAPSHOT == this.state) {
                ServiceAck[] serviceAcks = this.pollServiceAcks(logPosition, serviceId);
                ++this.serviceAckId;
                this.takeSnapshot(timestamp, logPosition, serviceAcks);
                if (null != this.clusterTermination) {
                    this.serviceProxy.terminationPosition(this.terminationPosition, (ErrorHandler)this.ctx.countedErrorHandler());
                    this.clusterTermination.deadlineNs(this.clusterClock.timeNanos() + this.ctx.terminationTimeoutNs());
                    this.state(ConsensusModule.State.TERMINATING);
                } else {
                    this.state(ConsensusModule.State.ACTIVE);
                    if (Cluster.Role.LEADER == this.role) {
                        ClusterControl.ToggleState.reset((AtomicCounter)this.controlToggle);
                    }
                }
            } else if (ConsensusModule.State.QUITTING == this.state) {
                this.closeAndTerminate();
            } else if (ConsensusModule.State.TERMINATING == this.state) {
                if (null == this.clusterTermination) {
                    this.consensusPublisher.terminationAck(this.leaderMember.publication(), this.leadershipTermId, logPosition, this.memberId);
                    this.recordingLog.commitLogPosition(this.leadershipTermId, logPosition);
                    this.closeAndTerminate();
                } else {
                    this.clusterTermination.onServicesTerminated();
                    if (this.clusterTermination.canTerminate(this.activeMembers, this.terminationPosition, this.clusterClock.timeNanos())) {
                        this.recordingLog.commitLogPosition(this.leadershipTermId, logPosition);
                        this.closeAndTerminate();
                    }
                }
            }
        }
    }

    void onReplaySessionMessage(long clusterSessionId, long timestamp) {
        ClusterSession session = (ClusterSession)this.sessionByIdMap.get(clusterSessionId);
        if (null != session) {
            session.timeOfLastActivityNs(this.clusterTimeUnit.toNanos(timestamp));
        } else if (clusterSessionId < 0L) {
            int i = PendingServiceMessageTracker.serviceId(clusterSessionId);
            this.pendingServiceMessageTrackers[i].sweepFollowerMessages(clusterSessionId);
        }
    }

    void onReplayTimerEvent(long correlationId) {
        if (!this.timerService.cancelTimerByCorrelationId(correlationId)) {
            this.expiredTimerCountByCorrelationIdMap.getAndIncrement(correlationId);
        }
    }

    void onReplaySessionOpen(long logPosition, long correlationId, long clusterSessionId, long timestamp, int responseStreamId, String responseChannel) {
        ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, this.refineResponseChannel(responseChannel));
        session.open(logPosition);
        session.lastActivityNs(this.clusterTimeUnit.toNanos(timestamp), correlationId);
        this.addSession(session);
        if (clusterSessionId >= this.nextSessionId) {
            this.nextSessionId = clusterSessionId + 1L;
        }
    }

    void onReplaySessionClose(long clusterSessionId, CloseReason closeReason) {
        ClusterSession session = (ClusterSession)this.sessionByIdMap.remove(clusterSessionId);
        if (null != session) {
            ConsensusModuleAgent.removeSession(this.sessions, clusterSessionId);
            session.closing(closeReason);
            session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
        }
    }

    void onReplayClusterAction(long leadershipTermId, ClusterAction action) {
        if (leadershipTermId == this.leadershipTermId) {
            if (ClusterAction.SUSPEND == action) {
                this.state(ConsensusModule.State.SUSPENDED);
            } else if (ClusterAction.RESUME == action) {
                this.state(ConsensusModule.State.ACTIVE);
            } else if (ClusterAction.SNAPSHOT == action) {
                this.state(ConsensusModule.State.SNAPSHOT);
            }
        }
    }

    void onReplayNewLeadershipTermEvent(long leadershipTermId, long logPosition, long timestamp, long termBaseLogPosition, TimeUnit timeUnit, int appVersion) {
        this.logReplayNewLeadershipTermEvent(this.memberId, null != this.election, leadershipTermId, logPosition, timestamp, termBaseLogPosition, timeUnit, appVersion);
        if (timeUnit != this.clusterTimeUnit) {
            this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterException("incompatible timestamp units: " + (Object)((Object)this.clusterTimeUnit) + " log=" + (Object)((Object)timeUnit), AeronException.Category.FATAL)));
            this.unexpectedTermination();
        }
        if (!this.ctx.appVersionValidator().isVersionCompatible(this.ctx.appVersion(), appVersion)) {
            this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterException("incompatible version: " + SemanticVersion.toString((int)this.ctx.appVersion()) + " log=" + SemanticVersion.toString((int)appVersion), AeronException.Category.FATAL)));
            this.unexpectedTermination();
        }
        this.leadershipTermId(leadershipTermId);
        if (null != this.election) {
            this.election.onReplayNewLeadershipTermEvent(leadershipTermId, logPosition, timestamp, termBaseLogPosition);
        }
    }

    void onReplayMembershipChange(long leadershipTermId, long logPosition, int leaderMemberId, ChangeType changeType, int memberId, String clusterMembers) {
        if (leadershipTermId == this.leadershipTermId) {
            if (ChangeType.JOIN == changeType) {
                ClusterMember[] newMembers = ClusterMember.parse(clusterMembers);
                if (memberId == this.memberId) {
                    this.activeMembers = newMembers;
                    this.clusterMemberByIdMap.clear();
                    this.clusterMemberByIdMap.compact();
                    ClusterMember.addClusterMemberIds(newMembers, this.clusterMemberByIdMap);
                    this.thisMember = ClusterMember.findMember(this.activeMembers, memberId);
                    this.leaderMember = ClusterMember.findMember(this.activeMembers, leaderMemberId);
                    ClusterMember.addConsensusPublications(newMembers, this.thisMember, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                } else {
                    this.clusterMemberJoined(memberId, newMembers);
                }
            } else if (ChangeType.QUIT == changeType) {
                if (memberId == this.memberId) {
                    this.state(ConsensusModule.State.QUITTING);
                } else {
                    this.clusterMemberQuit(memberId);
                    if (leaderMemberId == memberId && null == this.election) {
                        this.commitPosition.proposeMaxOrdered(logPosition);
                        this.enterElection();
                    }
                }
            }
            if (null != this.election) {
                this.election.onMembershipChange(this.activeMembers, changeType, memberId, logPosition);
            }
        }
    }

    int addLogPublication() {
        long logPublicationTag = this.aeron.nextCorrelationId();
        this.logPublicationChannelTag = this.aeron.nextCorrelationId();
        ChannelUri channelUri = ChannelUri.parse((CharSequence)this.ctx.logChannel());
        channelUri.put("alias", "log");
        channelUri.put("tags", this.logPublicationChannelTag + "," + logPublicationTag);
        if (channelUri.isUdp()) {
            if (!channelUri.containsKey("fc")) {
                long timeout = TimeUnit.NANOSECONDS.toSeconds(this.ctx.leaderHeartbeatTimeoutNs());
                channelUri.put("fc", "min,t:" + timeout + "s");
            }
            if (this.ctx.isLogMdc()) {
                channelUri.put("control-mode", "manual");
            }
            channelUri.put("ssc", Boolean.toString(this.activeMembers.length == 1));
        }
        if (null != this.recoveryPlan.log) {
            channelUri.initialPosition(this.recoveryPlan.appendedLogPosition, this.recoveryPlan.log.initialTermId, this.recoveryPlan.log.termBufferLength);
            channelUri.put("mtu", Integer.toString(this.recoveryPlan.log.mtuLength));
        } else {
            this.ensureConsistentInitialTermId(channelUri);
        }
        String channel = channelUri.toString();
        ExclusivePublication publication = this.aeron.addExclusivePublication(channel, this.ctx.logStreamId());
        this.logPublisher.publication(publication);
        if (this.ctx.isLogMdc()) {
            for (ClusterMember member : this.activeMembers) {
                if (member.id() == this.memberId) continue;
                this.logPublisher.addDestination(true, member.logEndpoint());
            }
            for (ClusterMember member : this.passiveMembers) {
                this.logPublisher.addDestination(true, member.logEndpoint());
            }
        }
        return publication.sessionId();
    }

    void joinLogAsLeader(long leadershipTermId, long logPosition, int logSessionId, boolean isStartup) {
        boolean isIpc = this.ctx.logChannel().startsWith("aeron:ipc");
        String channel = (isIpc ? "aeron:ipc" : "aeron:udp") + "?tags=" + this.logPublicationChannelTag + "|session-id=" + logSessionId + "|alias=log";
        this.leadershipTermId(leadershipTermId);
        this.startLogRecording(channel, this.ctx.logStreamId(), SourceLocation.LOCAL);
        while (!this.tryCreateAppendPosition(logSessionId)) {
            this.idle();
        }
        this.awaitServicesReady(isIpc ? channel : "aeron-spy:" + channel, this.ctx.logStreamId(), logSessionId, logPosition, Long.MAX_VALUE, isStartup, Cluster.Role.LEADER);
    }

    void liveLogDestination(String liveLogDestination) {
        this.liveLogDestination = liveLogDestination;
    }

    String liveLogDestination() {
        return this.liveLogDestination;
    }

    void catchupLogDestination(String catchupLogDestination) {
        this.catchupLogDestination = catchupLogDestination;
    }

    String catchupLogDestination() {
        return this.catchupLogDestination;
    }

    boolean tryJoinLogAsFollower(Image image, boolean isLeaderStartup, long nowNs) {
        Subscription logSubscription = image.subscription();
        if (-1L == this.logSubscriptionId) {
            this.startLogRecording(logSubscription.channel(), logSubscription.streamId(), SourceLocation.REMOTE);
        }
        if (this.tryCreateAppendPosition(image.sessionId())) {
            this.appendDynamicJoinTermAndSnapshots();
            this.logAdapter.image(image);
            this.lastAppendPosition = image.joinPosition();
            this.timeOfLastAppendPositionUpdateNs = nowNs;
            this.awaitServicesReady(logSubscription.channel(), logSubscription.streamId(), image.sessionId(), image.joinPosition(), Long.MAX_VALUE, isLeaderStartup, Cluster.Role.FOLLOWER);
            return true;
        }
        return false;
    }

    void awaitServicesReady(String logChannel, int streamId, int logSessionId, long logPosition, long maxLogPosition, boolean isStartup, Cluster.Role role) {
        this.serviceProxy.joinLog(logPosition, maxLogPosition, this.memberId, logSessionId, streamId, isStartup, role, logChannel);
        this.expectedAckPosition = logPosition;
        while (!ServiceAck.hasReached(logPosition, this.serviceAckId, this.serviceAckQueues)) {
            this.idle(this.consensusModuleAdapter.poll());
        }
        ServiceAck.removeHead(this.serviceAckQueues);
        ++this.serviceAckId;
    }

    LogReplay newLogReplay(long logPosition, long appendPosition) {
        return new LogReplay(this.archive, this.logRecordingId, logPosition, appendPosition, this.logAdapter, this.ctx);
    }

    int replayLogPoll(LogAdapter logAdapter, long stopPosition) {
        int workCount = 0;
        if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
            int fragments = logAdapter.poll(stopPosition);
            long position = logAdapter.position();
            if (fragments > 0) {
                this.commitPosition.setOrdered(position);
            } else if (logAdapter.isImageClosed() && position < stopPosition) {
                throw new ClusterEvent("unexpected image close when replaying log: position=" + position);
            }
            workCount += fragments;
        }
        return workCount += this.consensusModuleAdapter.poll();
    }

    long logRecordingId() {
        return this.logRecordingId;
    }

    void logRecordingId(long recordingId) {
        if (-1L != recordingId) {
            this.logRecordingId = recordingId;
        }
    }

    void truncateLogEntry(long leadershipTermId, long logPosition) {
        this.archive.stopAllReplays(this.logRecordingId);
        this.archive.truncateRecording(this.logRecordingId, logPosition);
        if (-1L != leadershipTermId) {
            this.recordingLog.commitLogPosition(leadershipTermId, logPosition);
        }
        this.logAdapter.disconnect((ErrorHandler)this.ctx.countedErrorHandler(), logPosition);
    }

    boolean appendNewLeadershipTermEvent(long nowNs) {
        return this.logPublisher.appendNewLeadershipTermEvent(this.leadershipTermId, this.clusterClock.timeUnit().convert(nowNs, TimeUnit.NANOSECONDS), this.election.logPosition(), this.memberId, this.logPublisher.sessionId(), this.clusterTimeUnit, this.ctx.appVersion());
    }

    void electionComplete(long nowNs) {
        long logPosition;
        this.leadershipTermId(this.election.leadershipTermId());
        if (Cluster.Role.LEADER == this.role) {
            this.timeOfLastLogUpdateNs = nowNs - this.leaderHeartbeatIntervalNs;
            this.highMemberId = Math.max(ClusterMember.highMemberId(this.activeMembers), ClusterMember.highMemberId(this.passiveMembers));
            this.timerService.currentTime(this.clusterClock.timeUnit().convert(nowNs, TimeUnit.NANOSECONDS));
            ClusterControl.ToggleState.activate((AtomicCounter)this.controlToggle);
            this.prepareSessionsForNewTerm(this.election.isLeaderStartup());
        } else {
            this.timeOfLastLogUpdateNs = nowNs;
            this.timeOfLastAppendPositionUpdateNs = nowNs;
            this.timeOfLastAppendPositionSendNs = nowNs;
        }
        this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount(), this.logRecordingId);
        this.notifiedCommitPosition = logPosition = this.election.logPosition();
        this.commitPosition.setOrdered(logPosition);
        this.updateMemberDetails(this.election.leader());
        this.election = null;
        this.connectIngress();
    }

    boolean dynamicJoinComplete(long nowNs) {
        if (0 == this.activeMembers.length) {
            this.activeMembers = this.dynamicJoin.clusterMembers();
            ClusterMember.addClusterMemberIds(this.activeMembers, this.clusterMemberByIdMap);
            this.leaderMember = this.dynamicJoin.leader();
            ClusterMember.addConsensusPublications(this.activeMembers, this.thisMember, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
        }
        if (-1 == this.memberId) {
            this.memberId = this.dynamicJoin.memberId();
            this.ctx.clusterMarkFile().memberId(this.memberId);
            this.thisMember.id(this.memberId);
        }
        this.dynamicJoin = null;
        this.election = new Election(false, this.leadershipTermId, this.recoveryPlan.lastTermBaseLogPosition, this.commitPosition.getWeak(), this.recoveryPlan.appendedLogPosition, this.activeMembers, this.clusterMemberByIdMap, this.thisMember, this.consensusPublisher, this.ctx, this);
        this.election.doWork(nowNs);
        return true;
    }

    void trackCatchupCompletion(ClusterMember follower, long leadershipTermId, short appendPositionFlags) {
        if ((-1L != follower.catchupReplaySessionId() || ConsensusModuleAgent.isCatchupAppendPosition(appendPositionFlags)) && follower.logPosition() >= this.logPublisher.position()) {
            if (-1L != follower.catchupReplayCorrelationId() && this.archive.archiveProxy().stopReplay(follower.catchupReplaySessionId(), this.aeron.nextCorrelationId(), this.archive.controlSessionId())) {
                follower.catchupReplayCorrelationId(-1L);
            }
            if (this.consensusPublisher.stopCatchup(follower.publication(), leadershipTermId, follower.id())) {
                follower.catchupReplaySessionId(-1L);
            }
        }
    }

    void catchupInitiated(long nowNs) {
        this.timeOfLastAppendPositionUpdateNs = nowNs;
        this.timeOfLastAppendPositionSendNs = nowNs;
    }

    int catchupPoll(long limitPosition, long nowNs) {
        int workCount = 0;
        if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
            int fragments = this.logAdapter.poll(Math.min(this.appendPosition.get(), limitPosition));
            workCount += fragments;
            if (fragments == 0 && this.logAdapter.image().isClosed()) {
                throw new ClusterEvent("unexpected image close during catchup: position=" + this.logAdapter.image().position());
            }
            ExclusivePublication publication = this.election.leader().publication();
            workCount += this.updateFollowerPosition(publication, nowNs, this.leadershipTermId, this.appendPosition.get(), (short)1);
            this.commitPosition.proposeMaxOrdered(this.logAdapter.position());
        }
        if (nowNs > this.timeOfLastAppendPositionUpdateNs + this.leaderHeartbeatTimeoutNs && ConsensusModule.State.ACTIVE == this.state) {
            throw new ClusterEvent("no catchup progress commitPosition=" + this.commitPosition.getWeak() + " limitPosition=" + limitPosition + " lastAppendPosition=" + this.lastAppendPosition + " appendPosition=" + (null != this.appendPosition ? this.appendPosition.get() : -1L) + " logPosition=" + this.election.logPosition());
        }
        return workCount += this.consensusModuleAdapter.poll();
    }

    boolean isCatchupNearLive(long position) {
        Image image = this.logAdapter.image();
        if (null != image) {
            long window;
            long localPosition = image.position();
            return localPosition >= position - (window = (long)Math.min(image.termBufferLength() >> 2, 0x2000000));
        }
        return false;
    }

    void stopAllCatchups() {
        for (ClusterMember member : this.activeMembers) {
            if (member.catchupReplaySessionId() == -1L) continue;
            if (member.catchupReplayCorrelationId() != -1L) {
                try {
                    this.archive.stopReplay(member.catchupReplaySessionId());
                }
                catch (Exception ex) {
                    this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("replay already stopped for catchup")));
                }
            }
            member.catchupReplaySessionId(-1L);
            member.catchupReplayCorrelationId(-1L);
        }
    }

    void retrievedSnapshot(long localRecordingId, RecordingLog.Snapshot leaderSnapshot) {
        this.dynamicJoinSnapshots.add(new RecordingLog.Snapshot(localRecordingId, leaderSnapshot.leadershipTermId, leaderSnapshot.termBaseLogPosition, leaderSnapshot.logPosition, leaderSnapshot.timestamp, leaderSnapshot.serviceId));
    }

    Counter loadSnapshotsForDynamicJoin() {
        this.recoveryPlan = RecordingLog.createRecoveryPlan(this.dynamicJoinSnapshots);
        Counter recoveryStateCounter = this.addRecoveryStateCounter(this.recoveryPlan);
        if (!this.recoveryPlan.snapshots.isEmpty()) {
            this.loadSnapshot(this.recoveryPlan.snapshots.get(0), this.archive);
        }
        return recoveryStateCounter;
    }

    boolean pollForSnapshotLoadAck(Counter recoveryStateCounter, long nowNs) {
        this.consensusModuleAdapter.poll();
        if (ServiceAck.hasReached(this.expectedAckPosition, this.serviceAckId, this.serviceAckQueues)) {
            this.captureServiceClientIds();
            ++this.serviceAckId;
            this.timeOfLastLogUpdateNs = nowNs;
            CloseHelper.close((ErrorHandler)this.ctx.countedErrorHandler(), (AutoCloseable)recoveryStateCounter);
            this.state(ConsensusModule.State.ACTIVE);
            return true;
        }
        return false;
    }

    int pollArchiveEvents() {
        int workCount = 0;
        if (null != this.archive) {
            RecordingSignalPoller poller = this.recordingSignalPoller;
            workCount += poller.poll();
            if (poller.isPollComplete()) {
                int templateId = poller.templateId();
                if (1 == templateId && poller.code() == ControlResponseCode.ERROR) {
                    for (ClusterMember member : this.activeMembers) {
                        if (member.catchupReplayCorrelationId() != poller.correlationId()) continue;
                        member.catchupReplaySessionId(-1L);
                        member.catchupReplayCorrelationId(-1L);
                        this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("catchup replay failed - " + poller.errorMessage())));
                        return workCount;
                    }
                    if (6L == poller.relevantId()) {
                        this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("replay no longer relevant - " + poller.errorMessage())));
                        return workCount;
                    }
                    ArchiveException ex = new ArchiveException(poller.errorMessage(), (int)poller.relevantId(), poller.correlationId());
                    if (ex.errorCode() == 11) {
                        this.ctx.countedErrorHandler().onError((Throwable)ex);
                        this.unexpectedTermination();
                    }
                    if (null != this.election) {
                        this.election.handleError(this.clusterClock.timeNanos(), ex);
                    }
                } else if (24 == templateId) {
                    long recordingId = poller.recordingId();
                    long position = poller.recordingPosition();
                    RecordingSignal signal = poller.recordingSignal();
                    if (RecordingSignal.STOP == signal && recordingId == this.logRecordingId) {
                        this.logRecordedPosition = position;
                    }
                    if (null != this.election) {
                        this.election.onRecordingSignal(poller.correlationId(), recordingId, position, signal);
                    }
                    if (null != this.dynamicJoin) {
                        this.dynamicJoin.onRecordingSignal(poller.correlationId(), recordingId, position, signal);
                    }
                }
            } else if (0 == workCount && !poller.subscription().isConnected()) {
                this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("local archive is not connected")));
                this.unexpectedTermination();
            }
        }
        return workCount;
    }

    private void leadershipTermId(long leadershipTermId) {
        this.leadershipTermId = leadershipTermId;
        for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
            tracker.leadershipTermId(leadershipTermId);
        }
    }

    private void logNewLeadershipTerm(long logLeadershipTermId, long nextLeadershipTermId, long nextTermBaseLogPosition, long nextLogPosition, long leadershipTermId, long termBaseLogPosition, long logPosition, long leaderRecordingId, long timestamp, int memberId, int leaderId, int logSessionId, int appVersion, boolean isStartup) {
    }

    private void logCommitPosition(long leadershipTermId, long logPosition, int leaderMemberId, int memberId) {
    }

    private void logAddPassiveMember(long correlationId, String memberEndpoints, int memberId) {
    }

    private void logReplayNewLeadershipTermEvent(int memberId, boolean isInElection, long leadershipTermId, long logPosition, long timestamp, long termBaseLogPosition, TimeUnit timeUnit, int appVersion) {
    }

    private void startLogRecording(String channel, int streamId, SourceLocation sourceLocation) {
        try {
            long logRecordingId = this.recordingLog.findLastTermRecordingId();
            this.logSubscriptionId = -1L == logRecordingId ? this.archive.startRecording(channel, streamId, sourceLocation, true) : this.archive.extendRecording(logRecordingId, channel, streamId, sourceLocation, true);
        }
        catch (ArchiveException ex) {
            if (ex.errorCode() == 11) {
                this.ctx.countedErrorHandler().onError((Throwable)ex);
                this.unexpectedTermination();
            }
            throw ex;
        }
    }

    private void prepareSessionsForNewTerm(boolean isStartup) {
        if (isStartup) {
            int size = this.sessions.size();
            for (int i = 0; i < size; ++i) {
                ClusterSession session = this.sessions.get(i);
                if (session.state() != ClusterSession.State.OPEN) continue;
                session.closing(CloseReason.TIMEOUT);
            }
        } else {
            int size = this.sessions.size();
            for (int i = 0; i < size; ++i) {
                ClusterSession session = this.sessions.get(i);
                if (session.closeReason() == CloseReason.TIMEOUT) {
                    session.resetCloseReason();
                    session.state(ClusterSession.State.OPEN);
                }
                if (session.state() != ClusterSession.State.OPEN) continue;
                session.connect((ErrorHandler)this.ctx.countedErrorHandler(), this.aeron);
            }
            long nowNs = this.clusterClock.timeNanos();
            int size2 = this.sessions.size();
            for (int i = 0; i < size2; ++i) {
                ClusterSession session = this.sessions.get(i);
                if (session.state() != ClusterSession.State.OPEN) continue;
                session.timeOfLastActivityNs(nowNs);
                session.hasNewLeaderEventPending(true);
            }
        }
    }

    private void updateMemberDetails(ClusterMember newLeader) {
        this.leaderMember = newLeader;
        for (ClusterMember clusterMember : this.activeMembers) {
            clusterMember.isLeader(clusterMember.id() == this.leaderMember.id());
        }
        this.ingressEndpoints = ClusterMember.ingressEndpoints(this.activeMembers);
    }

    private int slowTickWork(long nowNs) {
        int workCount = this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException("unexpected Aeron close");
        }
        if (ConsensusModule.State.CLOSED == this.state) {
            this.unexpectedTermination();
        } else if (this.isElectionRequired) {
            if (null == this.election) {
                this.enterElection();
            }
            this.isElectionRequired = false;
        }
        if (nowNs >= this.markFileUpdateDeadlineNs) {
            this.markFileUpdateDeadlineNs = nowNs + ClusteredServiceContainer.Configuration.MARK_FILE_UPDATE_INTERVAL_NS;
            this.markFile.updateActivityTimestamp(this.clusterClock.timeMillis());
        }
        workCount += this.pollArchiveEvents();
        workCount += this.sendRedirects(this.redirectSessions, nowNs);
        workCount += this.sendRejections(this.rejectedSessions, nowNs);
        if (null == this.election) {
            if (Cluster.Role.LEADER == this.role) {
                workCount += this.checkControlToggle(nowNs);
                if (ConsensusModule.State.ACTIVE == this.state) {
                    workCount += this.processPendingSessions(this.pendingSessions, nowNs);
                    workCount += this.checkSessions(this.sessions, nowNs);
                    workCount += this.processPassiveMembers(this.passiveMembers);
                    if (!ClusterMember.hasActiveQuorum(this.activeMembers, nowNs, this.leaderHeartbeatTimeoutNs)) {
                        this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("inactive follower quorum")));
                        this.enterElection();
                        ++workCount;
                    }
                } else if (ConsensusModule.State.TERMINATING == this.state && this.clusterTermination.canTerminate(this.activeMembers, this.terminationPosition, nowNs)) {
                    this.recordingLog.commitLogPosition(this.leadershipTermId, this.terminationPosition);
                    this.closeAndTerminate();
                }
            } else if ((ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) && nowNs >= this.timeOfLastLogUpdateNs + this.leaderHeartbeatTimeoutNs && -1L == this.terminationPosition) {
                this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("leader heartbeat timeout")));
                this.enterElection();
                ++workCount;
            }
        }
        return workCount;
    }

    private int consensusWork(long timestamp, long nowNs) {
        int workCount = 0;
        if (Cluster.Role.LEADER == this.role) {
            if (ConsensusModule.State.ACTIVE == this.state) {
                workCount += this.timerService.poll(timestamp);
                for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
                    workCount += tracker.poll();
                }
                workCount += this.ingressAdapter.poll();
            }
            workCount += this.updateLeaderPosition(nowNs);
        } else {
            if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
                if (-1L != this.terminationPosition && this.logAdapter.position() >= this.terminationPosition) {
                    this.serviceProxy.terminationPosition(this.terminationPosition, (ErrorHandler)this.ctx.countedErrorHandler());
                    this.state(ConsensusModule.State.TERMINATING);
                } else {
                    long limit = null != this.appendPosition ? this.appendPosition.get() : this.logRecordedPosition;
                    int count = this.logAdapter.poll(Math.min(this.notifiedCommitPosition, limit));
                    if (0 == count && this.logAdapter.isImageClosed()) {
                        this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("log disconnected from leader")));
                        this.enterElection();
                        return 1;
                    }
                    this.commitPosition.proposeMaxOrdered(this.logAdapter.position());
                    workCount += this.ingressAdapter.poll();
                    workCount += count;
                }
            }
            workCount += this.updateFollowerPosition(nowNs);
        }
        return workCount += this.consensusModuleAdapter.poll();
    }

    private int checkControlToggle(long nowNs) {
        switch (ClusterControl.ToggleState.get((AtomicCounter)this.controlToggle)) {
            case SUSPEND: {
                if (ConsensusModule.State.ACTIVE != this.state || !this.appendAction(ClusterAction.SUSPEND)) break;
                this.state(ConsensusModule.State.SUSPENDED);
                break;
            }
            case RESUME: {
                if (ConsensusModule.State.SUSPENDED != this.state || !this.appendAction(ClusterAction.RESUME)) break;
                this.state(ConsensusModule.State.ACTIVE);
                ClusterControl.ToggleState.reset((AtomicCounter)this.controlToggle);
                break;
            }
            case SNAPSHOT: {
                if (ConsensusModule.State.ACTIVE != this.state || !this.appendAction(ClusterAction.SNAPSHOT)) break;
                this.state(ConsensusModule.State.SNAPSHOT);
                break;
            }
            case SHUTDOWN: {
                if (ConsensusModule.State.ACTIVE != this.state || !this.appendAction(ClusterAction.SNAPSHOT)) break;
                CountedErrorHandler errorHandler = this.ctx.countedErrorHandler();
                long position = this.logPublisher.position();
                this.clusterTermination = new ClusterTermination(nowNs + this.ctx.terminationTimeoutNs());
                this.clusterTermination.terminationPosition((ErrorHandler)errorHandler, this.consensusPublisher, this.activeMembers, this.thisMember, this.leadershipTermId, position);
                this.terminationPosition = position;
                this.state(ConsensusModule.State.SNAPSHOT);
                break;
            }
            case ABORT: {
                if (ConsensusModule.State.ACTIVE != this.state) break;
                CountedErrorHandler errorHandler = this.ctx.countedErrorHandler();
                long position = this.logPublisher.position();
                this.clusterTermination = new ClusterTermination(nowNs + this.ctx.terminationTimeoutNs());
                this.clusterTermination.terminationPosition((ErrorHandler)errorHandler, this.consensusPublisher, this.activeMembers, this.thisMember, this.leadershipTermId, position);
                this.terminationPosition = position;
                this.serviceProxy.terminationPosition(this.terminationPosition, (ErrorHandler)errorHandler);
                this.state(ConsensusModule.State.TERMINATING);
                break;
            }
            default: {
                return 0;
            }
        }
        return 1;
    }

    private boolean appendAction(ClusterAction action) {
        return this.logPublisher.appendClusterAction(this.leadershipTermId, this.clusterClock.time(), action);
    }

    private int processPendingSessions(ArrayList<ClusterSession> pendingSessions, long nowNs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = pendingSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = pendingSessions.get(i);
            if (session.state() == ClusterSession.State.INVALID) {
                ArrayListUtil.fastUnorderedRemove(pendingSessions, (int)i, (int)lastIndex--);
                session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                continue;
            }
            if (nowNs > session.timeOfLastActivityNs() + this.sessionTimeoutNs && session.state() != ClusterSession.State.INIT) {
                ArrayListUtil.fastUnorderedRemove(pendingSessions, (int)i, (int)lastIndex--);
                session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                this.ctx.timedOutClientCounter().incrementOrdered();
                continue;
            }
            if ((session.state() == ClusterSession.State.INIT || session.state() == ClusterSession.State.CONNECTING || session.state() == ClusterSession.State.CONNECTED) && session.isResponsePublicationConnected(this.aeron, nowNs)) {
                session.state(ClusterSession.State.CONNECTED);
                this.authenticator.onConnectedSession(this.sessionProxy.session(session), this.clusterClock.timeMillis());
            }
            if (session.state() == ClusterSession.State.CHALLENGED && session.isResponsePublicationConnected(this.aeron, nowNs)) {
                this.authenticator.onChallengedSession(this.sessionProxy.session(session), this.clusterClock.timeMillis());
            }
            if (session.state() == ClusterSession.State.AUTHENTICATED) {
                switch (session.action()) {
                    case CLIENT: {
                        if (!session.responsePublication().isConnected() || !this.appendSessionAndOpen(session, nowNs)) break;
                        ArrayListUtil.fastUnorderedRemove(pendingSessions, (int)i, (int)lastIndex--);
                        this.addSession(session);
                        ++workCount;
                        break;
                    }
                    case BACKUP: {
                        RecordingLog.Entry entry = this.recordingLog.findLastTerm();
                        if (null == entry || !this.consensusPublisher.backupResponse(session, this.commitPosition.id(), this.leaderMember.id(), entry, this.recoveryPlan, ClusterMember.encodeAsString(this.activeMembers))) break;
                        ArrayListUtil.fastUnorderedRemove(pendingSessions, (int)i, (int)lastIndex--);
                        session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                        ++workCount;
                        break;
                    }
                    case HEARTBEAT: {
                        if (!this.consensusPublisher.heartbeatResponse(session)) break;
                        ArrayListUtil.fastUnorderedRemove(pendingSessions, (int)i, (int)lastIndex--);
                        session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                        ++workCount;
                    }
                }
                continue;
            }
            if (session.state() != ClusterSession.State.REJECTED) continue;
            ArrayListUtil.fastUnorderedRemove(pendingSessions, (int)i, (int)lastIndex--);
            this.rejectedSessions.add(session);
        }
        return workCount;
    }

    private int sendRejections(ArrayList<ClusterSession> rejectedSessions, long nowNs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = rejectedSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = rejectedSessions.get(i);
            String detail = session.responseDetail();
            EventCode eventCode = session.eventCode();
            if (!(session.isResponsePublicationConnected(this.aeron, nowNs) && this.egressPublisher.sendEvent(session, this.leadershipTermId, this.leaderMember.id(), eventCode, detail) || session.state() != ClusterSession.State.INIT && nowNs > session.timeOfLastActivityNs() + this.sessionTimeoutNs) && session.state() != ClusterSession.State.INVALID) continue;
            ArrayListUtil.fastUnorderedRemove(rejectedSessions, (int)i, (int)lastIndex--);
            session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
            ++workCount;
        }
        return workCount;
    }

    private int sendRedirects(ArrayList<ClusterSession> redirectSessions, long nowNs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = redirectSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = redirectSessions.get(i);
            EventCode eventCode = EventCode.REDIRECT;
            int leaderId = this.leaderMember.id();
            if (!(session.isResponsePublicationConnected(this.aeron, nowNs) && this.egressPublisher.sendEvent(session, this.leadershipTermId, leaderId, eventCode, this.ingressEndpoints) || session.state() != ClusterSession.State.INIT && nowNs > session.timeOfLastActivityNs() + this.sessionTimeoutNs) && session.state() != ClusterSession.State.INVALID) continue;
            ArrayListUtil.fastUnorderedRemove(redirectSessions, (int)i, (int)lastIndex--);
            session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
            ++workCount;
        }
        return workCount;
    }

    private int processPassiveMembers(ClusterMember[] passiveMembers) {
        int workCount = 0;
        for (ClusterMember member : passiveMembers) {
            if (member.correlationId() != -1L) {
                if (!this.consensusPublisher.clusterMemberChange(member.publication(), member.correlationId(), this.leaderMember.id(), ClusterMember.encodeAsString(this.activeMembers), ClusterMember.encodeAsString(passiveMembers))) continue;
                member.correlationId(-1L);
                ++workCount;
                continue;
            }
            if (!member.hasRequestedJoin() || member.logPosition() != this.logPublisher.position()) continue;
            ClusterMember[] newMembers = ClusterMember.addMember(this.activeMembers, member);
            long timestamp = this.clusterClock.time();
            if (this.logPublisher.appendMembershipChangeEvent(this.leadershipTermId, timestamp, this.leaderMember.id(), newMembers.length, ChangeType.JOIN, member.id(), ClusterMember.encodeAsString(newMembers)) <= 0L) continue;
            this.timeOfLastLogUpdateNs = this.clusterTimeUnit.toNanos(timestamp) - this.leaderHeartbeatIntervalNs;
            this.passiveMembers = ClusterMember.removeMember(this.passiveMembers, member.id());
            this.activeMembers = newMembers;
            this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
            member.hasRequestedJoin(false);
            ++workCount;
            break;
        }
        return workCount;
    }

    private int checkSessions(ArrayList<ClusterSession> sessions, long nowNs) {
        int workCount = 0;
        for (int i = sessions.size() - 1; i >= 0; --i) {
            ClusterSession session = sessions.get(i);
            if (nowNs > session.timeOfLastActivityNs() + this.sessionTimeoutNs) {
                String msg;
                if (session.state() == ClusterSession.State.OPEN) {
                    session.closing(CloseReason.TIMEOUT);
                    if (!this.logPublisher.appendSessionClose(this.memberId, session, this.leadershipTermId, this.clusterClock.time(), this.clusterClock.timeUnit())) continue;
                    msg = session.closeReason().name();
                    this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, msg);
                    session.closedLogPosition(this.logPublisher.position());
                    this.uncommittedClosedSessions.addLast(session);
                    sessions.remove(i);
                    this.sessionByIdMap.remove(session.id());
                    session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                    this.ctx.timedOutClientCounter().incrementOrdered();
                    ++workCount;
                    continue;
                }
                if (session.state() == ClusterSession.State.CLOSING) {
                    if (!this.logPublisher.appendSessionClose(this.memberId, session, this.leadershipTermId, this.clusterClock.time(), this.clusterClock.timeUnit())) continue;
                    msg = session.closeReason().name();
                    this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, msg);
                    session.closedLogPosition(this.logPublisher.position());
                    this.uncommittedClosedSessions.addLast(session);
                    sessions.remove(i);
                    this.sessionByIdMap.remove(session.id());
                    session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                    if (session.closeReason() == CloseReason.TIMEOUT) {
                        this.ctx.timedOutClientCounter().incrementOrdered();
                    }
                    ++workCount;
                    continue;
                }
                sessions.remove(i);
                this.sessionByIdMap.remove(session.id());
                session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
                ++workCount;
                continue;
            }
            if (session.hasOpenEventPending()) {
                workCount += this.sendSessionOpenEvent(session);
                continue;
            }
            if (!session.hasNewLeaderEventPending()) continue;
            workCount += this.sendNewLeaderEvent(session);
        }
        return workCount;
    }

    private void captureServiceAck(long logPosition, long ackId, long relevantId, int serviceId) {
        if (0L == ackId && -1L != this.serviceClientIds[serviceId]) {
            throw new ClusterException("initial ack already received from service: possible duplicate serviceId=" + serviceId);
        }
        this.serviceAckQueues[serviceId].offerLast(new ServiceAck(ackId, logPosition, relevantId));
    }

    private ServiceAck[] pollServiceAcks(long logPosition, int serviceId) {
        ServiceAck[] serviceAcks = new ServiceAck[this.serviceAckQueues.length];
        int length = this.serviceAckQueues.length;
        for (int id = 0; id < length; ++id) {
            ServiceAck serviceAck = this.serviceAckQueues[id].pollFirst();
            if (null == serviceAck || serviceAck.logPosition() != logPosition) {
                throw new ClusterException("invalid ack for serviceId=" + serviceId + " logPosition=" + logPosition + " " + serviceAck);
            }
            serviceAcks[id] = serviceAck;
        }
        return serviceAcks;
    }

    private int sendNewLeaderEvent(ClusterSession session) {
        if (this.egressPublisher.newLeader(session, this.leadershipTermId, this.leaderMember.id(), this.ingressEndpoints)) {
            session.hasNewLeaderEventPending(false);
            return 1;
        }
        return 0;
    }

    private int sendSessionOpenEvent(ClusterSession session) {
        if (this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.OK, "")) {
            session.clearOpenEventPending();
            return 1;
        }
        return 0;
    }

    private boolean appendSessionAndOpen(ClusterSession session, long nowNs) {
        long resultingPosition = this.logPublisher.appendSessionOpen(session, this.leadershipTermId, this.clusterClock.time());
        if (resultingPosition > 0L) {
            session.open(resultingPosition);
            session.timeOfLastActivityNs(nowNs);
            this.sendSessionOpenEvent(session);
            return true;
        }
        return false;
    }

    private boolean tryCreateAppendPosition(int logSessionId) {
        CountersReader counters = this.aeron.countersReader();
        int counterId = RecordingPos.findCounterIdBySession((CountersReader)counters, (int)logSessionId);
        if (-1 == counterId) {
            return false;
        }
        long registrationId = counters.getCounterRegistrationId(counterId);
        if (0L == registrationId) {
            return false;
        }
        long recordingId = RecordingPos.getRecordingId((CountersReader)counters, (int)counterId);
        if (-1L == recordingId) {
            return false;
        }
        this.logRecordingId(recordingId);
        this.appendPosition = new ReadableCounter(counters, registrationId, counterId);
        this.logRecordedPosition = -1L;
        return true;
    }

    private int updateFollowerPosition(long nowNs) {
        long recordedPosition = null != this.appendPosition ? this.appendPosition.get() : this.logRecordedPosition;
        return this.updateFollowerPosition(this.leaderMember.publication(), nowNs, this.leadershipTermId, recordedPosition, (short)0);
    }

    private int updateFollowerPosition(ExclusivePublication publication, long nowNs, long leadershipTermId, long appendPosition, short flags) {
        long position = Math.max(appendPosition, this.lastAppendPosition);
        if ((position > this.lastAppendPosition || nowNs >= this.timeOfLastAppendPositionSendNs + this.leaderHeartbeatIntervalNs) && this.consensusPublisher.appendPosition(publication, leadershipTermId, position, this.memberId, flags)) {
            if (position > this.lastAppendPosition) {
                this.lastAppendPosition = position;
                this.timeOfLastAppendPositionUpdateNs = nowNs;
            }
            this.timeOfLastAppendPositionSendNs = nowNs;
            return 1;
        }
        return 0;
    }

    private void loadSnapshot(RecordingLog.Snapshot snapshot, AeronArchive archive) {
        String channel = this.ctx.replayChannel();
        int streamId = this.ctx.replayStreamId();
        int sessionId = (int)archive.startReplay(snapshot.recordingId, 0L, -1L, channel, streamId);
        String replayChannel = ChannelUri.addSessionId((String)channel, (int)sessionId);
        try (Subscription subscription = this.aeron.addSubscription(replayChannel, streamId);){
            Image image = this.awaitImage(sessionId, subscription);
            ConsensusModuleSnapshotAdapter adapter = new ConsensusModuleSnapshotAdapter(image, this);
            while (true) {
                int fragments;
                if (0 == (fragments = adapter.poll())) {
                    if (adapter.isDone()) break;
                    if (image.isClosed()) {
                        this.pollArchiveEvents();
                        throw new ClusterException("snapshot ended unexpectedly: " + image);
                    }
                }
                this.idle(fragments);
            }
            for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
                tracker.verify();
                tracker.reset();
            }
        }
        this.timerService.currentTime(this.clusterClock.time());
        this.commitPosition.setOrdered(snapshot.logPosition);
        this.leadershipTermId(snapshot.leadershipTermId);
        this.expectedAckPosition = snapshot.logPosition;
    }

    private Image awaitImage(int sessionId, Subscription subscription) {
        Image image;
        this.idleStrategy.reset();
        while ((image = subscription.imageBySessionId(sessionId)) == null) {
            this.idle();
        }
        return image;
    }

    private Counter addRecoveryStateCounter(RecordingLog.RecoveryPlan plan) {
        int snapshotsCount = plan.snapshots.size();
        if (snapshotsCount > 0) {
            long[] serviceSnapshotRecordingIds = new long[snapshotsCount - 1];
            RecordingLog.Snapshot snapshot = plan.snapshots.get(0);
            for (int i = 1; i < snapshotsCount; ++i) {
                RecordingLog.Snapshot serviceSnapshot = plan.snapshots.get(i);
                serviceSnapshotRecordingIds[serviceSnapshot.serviceId] = serviceSnapshot.recordingId;
            }
            return RecoveryState.allocate(this.aeron, snapshot.leadershipTermId, snapshot.logPosition, snapshot.timestamp, this.ctx.clusterId(), serviceSnapshotRecordingIds);
        }
        return RecoveryState.allocate(this.aeron, this.leadershipTermId, 0L, 0L, this.ctx.clusterId(), new long[0]);
    }

    private DynamicJoin requiresDynamicJoin() {
        if (0 == this.activeMembers.length && null != this.ctx.clusterConsensusEndpoints()) {
            return new DynamicJoin(this.ctx.clusterConsensusEndpoints(), this.archive, this.consensusPublisher, this.ctx, this);
        }
        return null;
    }

    private void captureServiceClientIds() {
        int length = this.serviceClientIds.length;
        for (int i = 0; i < length; ++i) {
            ServiceAck serviceAck = this.serviceAckQueues[i].pollFirst();
            this.serviceClientIds[i] = Objects.requireNonNull(serviceAck).relevantId();
        }
    }

    private void handleMemberRemovals(long commitPosition) {
        ClusterMember[] members = this.activeMembers;
        for (ClusterMember member : this.activeMembers) {
            if (!member.hasRequestedRemove() || member.removalPosition() > commitPosition) continue;
            if (member.id() == this.memberId) {
                this.state(ConsensusModule.State.QUITTING);
            }
            members = ClusterMember.removeMember(members, member.id());
            this.clusterMemberByIdMap.remove(member.id());
            this.clusterMemberByIdMap.compact();
            member.closePublication((ErrorHandler)this.ctx.countedErrorHandler());
            this.logPublisher.removeDestination(this.ctx.isLogMdc(), member.logEndpoint());
            --this.pendingMemberRemovals;
        }
        this.activeMembers = members;
        this.rankedPositions = new long[ClusterMember.quorumThreshold(members.length)];
    }

    private int updateLeaderPosition(long nowNs) {
        if (null != this.appendPosition) {
            return this.updateLeaderPosition(nowNs, this.appendPosition.get());
        }
        return 0;
    }

    long quorumPosition() {
        return ClusterMember.quorumPosition(this.activeMembers, this.rankedPositions);
    }

    int updateLeaderPosition(long nowNs, long position) {
        this.thisMember.logPosition(position).timeOfLastAppendPositionNs(nowNs);
        long commitPosition = Math.min(this.quorumPosition(), position);
        if (commitPosition > this.commitPosition.getWeak() || nowNs >= this.timeOfLastLogUpdateNs + this.leaderHeartbeatIntervalNs) {
            this.publishCommitPosition(commitPosition);
            this.commitPosition.setOrdered(commitPosition);
            this.timeOfLastLogUpdateNs = nowNs;
            this.sweepUncommittedEntriesTo(commitPosition);
            if (this.pendingMemberRemovals > 0) {
                this.handleMemberRemovals(commitPosition);
            }
            return 1;
        }
        return 0;
    }

    void publishCommitPosition(long commitPosition) {
        for (ClusterMember member : this.activeMembers) {
            if (member.id() == this.memberId) continue;
            this.consensusPublisher.commitPosition(member.publication(), this.leadershipTermId, commitPosition, this.memberId);
        }
        for (ClusterMember member : this.passiveMembers) {
            if (member.id() == this.memberId || !member.hasRequestedJoin()) continue;
            this.consensusPublisher.commitPosition(member.publication(), this.leadershipTermId, commitPosition, this.memberId);
        }
    }

    LogReplication newLogReplication(String leaderArchiveEndpoint, long leaderRecordingId, long stopPosition, long nowNs) {
        return new LogReplication(this.archive, leaderRecordingId, this.logRecordingId, stopPosition, ChannelUri.createDestinationUri((String)this.ctx.leaderArchiveControlChannel(), (String)leaderArchiveEndpoint), this.ctx.replicationChannel(), this.ctx.leaderHeartbeatTimeoutNs(), this.ctx.leaderHeartbeatIntervalNs(), nowNs);
    }

    void awaitNoLocalSocketAddresses(long registrationId) {
        CountersReader countersReader = this.aeron.countersReader();
        while (LocalSocketAddressStatus.findNumberOfAddressesByRegistrationId((CountersReader)countersReader, (long)registrationId) > 0) {
            this.idle();
        }
    }

    private void clearSessionsAfter(long logPosition) {
        for (int i = this.sessions.size() - 1; i >= 0; --i) {
            ClusterSession session = this.sessions.get(i);
            if (session.openedLogPosition() <= logPosition) continue;
            this.sessions.remove(i);
            this.sessionByIdMap.remove(session.id());
            this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, "election");
            session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
        }
        for (ClusterSession session : this.pendingSessions) {
            this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, "election");
            session.close(this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
        }
        this.pendingSessions.clear();
    }

    private void sweepUncommittedEntriesTo(long commitPosition) {
        ClusterSession clusterSession;
        for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
            tracker.sweepLeaderMessages();
        }
        while (this.uncommittedTimers.peekLong() <= commitPosition) {
            this.uncommittedTimers.pollLong();
            this.uncommittedTimers.pollLong();
        }
        while (null != (clusterSession = this.uncommittedClosedSessions.peekFirst()) && clusterSession.closedLogPosition() <= commitPosition) {
            this.uncommittedClosedSessions.pollFirst();
        }
    }

    private void restoreUncommittedEntries(long commitPosition) {
        ClusterSession session;
        PendingServiceMessageTracker[] i = this.uncommittedTimers.iterator();
        while (i.hasNext()) {
            long appendPosition = i.nextValue();
            long correlationId = i.nextValue();
            if (appendPosition <= commitPosition) continue;
            this.timerService.scheduleTimerForCorrelationId(correlationId, 0L);
        }
        this.uncommittedTimers.clear();
        for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
            tracker.restoreUncommittedMessages();
        }
        while (null != (session = this.uncommittedClosedSessions.pollFirst())) {
            if (session.closedLogPosition() <= commitPosition) continue;
            session.closedLogPosition(-1L);
            if (CloseReason.TIMEOUT == session.closeReason()) {
                session.resetCloseReason();
                session.state(ClusterSession.State.OPEN);
            } else {
                session.state(ClusterSession.State.CLOSING);
            }
            this.addSession(session);
        }
    }

    private void enterElection() {
        if (null != this.election) {
            throw new IllegalStateException("election in progress");
        }
        this.role(Cluster.Role.FOLLOWER);
        RecordingLog.Entry termEntry = this.recordingLog.findTermEntry(this.leadershipTermId);
        long termBaseLogPosition = null != termEntry ? termEntry.termBaseLogPosition : this.recoveryPlan.lastTermBaseLogPosition;
        this.election = new Election(false, this.leadershipTermId, termBaseLogPosition, this.commitPosition.getWeak(), null != this.appendPosition ? this.appendPosition.get() : this.recoveryPlan.appendedLogPosition, this.activeMembers, this.clusterMemberByIdMap, this.thisMember, this.consensusPublisher, this.ctx, this);
        this.election.doWork(this.clusterClock.timeNanos());
    }

    private void idle() {
        ConsensusModuleAgent.checkInterruptStatus();
        this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException("unexpected Aeron close");
        }
        this.idleStrategy.idle();
        this.pollArchiveEvents();
    }

    private void idle(int workCount) {
        ConsensusModuleAgent.checkInterruptStatus();
        this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException("unexpected Aeron close");
        }
        this.idleStrategy.idle(workCount);
        if (0 == workCount) {
            this.pollArchiveEvents();
        }
    }

    private static void checkInterruptStatus() {
        if (Thread.currentThread().isInterrupted()) {
            throw new AgentTerminationException("interrupted");
        }
    }

    private void takeSnapshot(long timestamp, long logPosition, ServiceAck[] serviceAcks) {
        long recordingId;
        try (ExclusivePublication publication = this.aeron.addExclusivePublication(this.ctx.snapshotChannel(), this.ctx.snapshotStreamId());){
            String channel = ChannelUri.addSessionId((String)this.ctx.snapshotChannel(), (int)publication.sessionId());
            this.archive.startRecording(channel, this.ctx.snapshotStreamId(), SourceLocation.LOCAL, true);
            CountersReader counters = this.aeron.countersReader();
            int counterId = this.awaitRecordingCounter(counters, publication.sessionId());
            recordingId = RecordingPos.getRecordingId((CountersReader)counters, (int)counterId);
            this.snapshotState(publication, logPosition, this.leadershipTermId);
            this.awaitRecordingComplete(recordingId, publication.position(), counters, counterId);
        }
        catch (ArchiveException ex) {
            if (ex.errorCode() == 11) {
                this.ctx.countedErrorHandler().onError((Throwable)ex);
                this.unexpectedTermination();
            }
            throw ex;
        }
        long termBaseLogPosition = this.recordingLog.getTermEntry((long)this.leadershipTermId).termBaseLogPosition;
        for (int serviceId = serviceAcks.length - 1; serviceId >= 0; --serviceId) {
            long snapshotId = serviceAcks[serviceId].relevantId();
            this.recordingLog.appendSnapshot(snapshotId, this.leadershipTermId, termBaseLogPosition, logPosition, timestamp, serviceId);
        }
        this.recordingLog.appendSnapshot(recordingId, this.leadershipTermId, termBaseLogPosition, logPosition, timestamp, -1);
        this.recordingLog.force(this.ctx.fileSyncLevel());
        this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount(), -1L);
        this.ctx.snapshotCounter().incrementOrdered();
        long nowNs = this.clusterClock.timeNanos();
        int size = this.sessions.size();
        for (int i = 0; i < size; ++i) {
            this.sessions.get(i).timeOfLastActivityNs(nowNs);
        }
    }

    private void awaitRecordingComplete(long recordingId, long position, CountersReader counters, int counterId) {
        this.idleStrategy.reset();
        while (counters.getCounterValue(counterId) < position) {
            this.idle();
            if (RecordingPos.isActive((CountersReader)counters, (int)counterId, (long)recordingId)) continue;
            throw new ClusterException("recording has stopped unexpectedly: " + recordingId);
        }
    }

    private int awaitRecordingCounter(CountersReader counters, int sessionId) {
        this.idleStrategy.reset();
        int counterId = RecordingPos.findCounterIdBySession((CountersReader)counters, (int)sessionId);
        while (-1 == counterId) {
            this.idle();
            counterId = RecordingPos.findCounterIdBySession((CountersReader)counters, (int)sessionId);
        }
        return counterId;
    }

    private void snapshotState(ExclusivePublication publication, long logPosition, long leadershipTermId) {
        ConsensusModuleSnapshotTaker snapshotTaker = new ConsensusModuleSnapshotTaker(publication, this.idleStrategy, this.aeronClientInvoker);
        snapshotTaker.markBegin(1L, logPosition, leadershipTermId, 0, this.clusterTimeUnit, this.ctx.appVersion());
        PendingServiceMessageTracker trackerOne = this.pendingServiceMessageTrackers[0];
        snapshotTaker.snapshotConsensusModuleState(this.nextSessionId, trackerOne.nextServiceSessionId(), trackerOne.logServiceSessionId(), trackerOne.size());
        snapshotTaker.snapshotClusterMembers(this.memberId, this.highMemberId, ClusterMember.encodeAsString(this.activeMembers));
        int size = this.sessions.size();
        for (int i = 0; i < size; ++i) {
            ClusterSession session = this.sessions.get(i);
            ClusterSession.State sessionState = session.state();
            if (sessionState != ClusterSession.State.OPEN && sessionState != ClusterSession.State.CLOSING) continue;
            snapshotTaker.snapshotSession(session);
        }
        this.timerService.snapshot(snapshotTaker);
        for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
            snapshotTaker.snapshot(tracker);
        }
        snapshotTaker.markEnd(1L, logPosition, leadershipTermId, 0, this.clusterTimeUnit, this.ctx.appVersion());
    }

    private void clusterMemberJoined(int memberId, ClusterMember[] newMembers) {
        this.highMemberId = Math.max(this.highMemberId, memberId);
        ClusterMember eventMember = ClusterMember.findMember(newMembers, memberId);
        if (null != eventMember) {
            if (null == eventMember.publication()) {
                ClusterMember.addConsensusPublication(eventMember, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
            }
            this.activeMembers = ClusterMember.addMember(this.activeMembers, eventMember);
            this.clusterMemberByIdMap.put(memberId, (Object)eventMember);
            this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
        }
    }

    private void clusterMemberQuit(int memberId) {
        this.activeMembers = ClusterMember.removeMember(this.activeMembers, memberId);
        this.clusterMemberByIdMap.remove(memberId);
        this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
    }

    private void onUnavailableIngressImage(Image image) {
        this.ingressAdapter.freeSessionBuffer(image.sessionId());
    }

    private void onUnavailableCounter(CountersReader counters, long registrationId, int counterId) {
        if (ConsensusModule.State.TERMINATING != this.state && ConsensusModule.State.QUITTING != this.state) {
            for (long clientId : this.serviceClientIds) {
                if (registrationId != clientId) continue;
                this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("Aeron client in service closed unexpectedly")));
                this.state(ConsensusModule.State.CLOSED);
                return;
            }
            if (null != this.appendPosition && this.appendPosition.registrationId() == registrationId) {
                this.appendPosition = null;
                this.logSubscriptionId = -1L;
                if (null != this.election) {
                    this.election.handleError(this.clusterClock.timeNanos(), (Throwable)((Object)new ClusterEvent("log recording ended unexpectedly (null != election)")));
                } else if (-1L == this.terminationPosition) {
                    this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("log recording ended unexpectedly (NULL_POSITION == terminationPosition)")));
                    this.isElectionRequired = true;
                }
            }
        }
    }

    private void closeAndTerminate() {
        this.tryStopLogRecording();
        this.state(ConsensusModule.State.CLOSED);
        throw new ClusterTerminationException(true);
    }

    private void unexpectedTermination() {
        this.aeron.removeUnavailableCounterHandler(this.unavailableCounterHandlerRegistrationId);
        this.serviceProxy.terminationPosition(0L, (ErrorHandler)this.ctx.countedErrorHandler());
        this.tryStopLogRecording();
        this.state(ConsensusModule.State.CLOSED);
        throw new ClusterTerminationException(false);
    }

    private void tryStopLogRecording() {
        this.appendPosition = null;
        if (-1L != this.logSubscriptionId && this.archive.archiveProxy().publication().isConnected()) {
            try {
                this.archive.tryStopRecording(this.logSubscriptionId);
            }
            catch (Exception ex) {
                this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterException(ex, AeronException.Category.WARN)));
            }
            this.logSubscriptionId = -1L;
        }
        if (-1L != this.logRecordingId && this.archive.archiveProxy().publication().isConnected()) {
            try {
                this.archive.tryStopRecordingByIdentity(this.logRecordingId);
            }
            catch (Exception ex) {
                this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterException(ex, AeronException.Category.WARN)));
            }
        }
    }

    private long getLastAppendedPosition() {
        this.idleStrategy.reset();
        long appendPosition;
        while (-1L == (appendPosition = this.archive.getStopPosition(this.logRecordingId))) {
            this.idle();
        }
        return appendPosition;
    }

    private void appendDynamicJoinTermAndSnapshots() {
        if (!this.dynamicJoinSnapshots.isEmpty()) {
            RecordingLog.Snapshot lastSnapshot = this.dynamicJoinSnapshots.get(this.dynamicJoinSnapshots.size() - 1);
            RecordingLog.Entry termEntry = this.recordingLog.findTermEntry(lastSnapshot.leadershipTermId);
            if (null == termEntry) {
                this.recordingLog.appendTerm(this.logRecordingId, lastSnapshot.leadershipTermId, lastSnapshot.termBaseLogPosition, lastSnapshot.timestamp);
            } else if (termEntry.recordingId != this.logRecordingId || termEntry.termBaseLogPosition != lastSnapshot.termBaseLogPosition) {
                throw new ClusterException("Unexpected termEntry found leadershipTermId=" + termEntry.leadershipTermId + " recordingId=" + termEntry.recordingId + " termBaseLogPosition=" + termEntry.termBaseLogPosition + " expected leadershipTermId=" + lastSnapshot.leadershipTermId + " recordingId=" + this.logRecordingId + " termBaseLogPosition=" + lastSnapshot.termBaseLogPosition);
            }
            for (int i = this.dynamicJoinSnapshots.size() - 1; i >= 0; --i) {
                RecordingLog.Snapshot snapshot = this.dynamicJoinSnapshots.get(i);
                this.recordingLog.appendSnapshot(snapshot.recordingId, snapshot.leadershipTermId, snapshot.termBaseLogPosition, snapshot.logPosition, snapshot.timestamp, snapshot.serviceId);
            }
            this.dynamicJoinSnapshots.clear();
        }
    }

    private boolean isIngressMulticast() {
        InetSocketAddress address;
        ChannelUri ingressUri = ChannelUri.parse((CharSequence)this.ctx.ingressChannel());
        if (!ingressUri.containsKey("endpoint")) {
            ingressUri.put("endpoint", this.thisMember.ingressEndpoint());
        }
        return null != (address = UdpChannel.destinationAddress((ChannelUri)ingressUri, (NameResolver)DefaultNameResolver.INSTANCE)) && null != address.getAddress() && address.getAddress().isMulticastAddress();
    }

    private void connectIngress() {
        ChannelUri ingressUri = ChannelUri.parse((CharSequence)this.ctx.ingressChannel());
        boolean isIngressMulticast = this.isIngressMulticast();
        if (Cluster.Role.LEADER != this.role && isIngressMulticast) {
            return;
        }
        String ingressNetworkEndpoint = ingressUri.get("endpoint");
        String ingressNetworkInterface = ingressUri.get("interface");
        if (null == ingressNetworkEndpoint) {
            ingressNetworkEndpoint = this.thisMember.ingressEndpoint();
        }
        ingressUri.remove("endpoint");
        ingressUri.remove("interface");
        ingressUri.put("control-mode", "manual");
        Subscription ingressSubscription = this.aeron.addSubscription(ingressUri.toString(), this.ctx.ingressStreamId(), null, this::onUnavailableIngressImage);
        String ingressNetworkDestination = new ChannelUriStringBuilder().media("udp").endpoint(ingressNetworkEndpoint).networkInterface(ingressNetworkInterface).build();
        ingressSubscription.addDestination(ingressNetworkDestination);
        if (this.ctx.isIpcIngressAllowed() && Cluster.Role.LEADER == this.role) {
            ingressSubscription.addDestination("aeron:ipc");
        }
        this.ingressAdapter.connect(ingressSubscription);
    }

    private void ensureConsistentInitialTermId(ChannelUri channelUri) {
        channelUri.put("init-term-id", "0");
        channelUri.put("term-id", "0");
        channelUri.put("term-offset", "0");
    }

    private void checkFollowerForConsensusPublication(int followerMemberId) {
        ClusterMember follower = (ClusterMember)this.clusterMemberByIdMap.get(followerMemberId);
        if (null != follower && null == follower.publication()) {
            ClusterMember.addConsensusPublication(follower, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, (ErrorHandler)this.ctx.countedErrorHandler());
        }
    }

    private void runTerminationHook() {
        try {
            this.ctx.terminationHook().run();
        }
        catch (Exception ex) {
            this.ctx.countedErrorHandler().onError((Throwable)ex);
        }
    }

    private String refineResponseChannel(String responseChannel) {
        String egressChannel = this.ctx.egressChannel();
        if (null == egressChannel) {
            return responseChannel;
        }
        if (responseChannel.contains("endpoint")) {
            String responseEndpoint = ChannelUri.parse((CharSequence)responseChannel).get("endpoint");
            ChannelUri channelUri = ChannelUri.parse((CharSequence)egressChannel);
            channelUri.put("endpoint", responseEndpoint);
            return channelUri.toString();
        }
        if (this.ctx.isIpcIngressAllowed() && responseChannel.startsWith("aeron:ipc")) {
            return responseChannel;
        }
        return egressChannel;
    }

    private void stopExistingCatchupReplay(ClusterMember follower) {
        if (-1L != follower.catchupReplaySessionId() && this.archive.archiveProxy().stopReplay(follower.catchupReplaySessionId(), this.aeron.nextCorrelationId(), this.archive.controlSessionId())) {
            follower.catchupReplaySessionId(-1L);
            follower.catchupReplayCorrelationId(-1L);
        }
    }

    private static boolean isCatchupAppendPosition(short flags) {
        return 0 != (1 & flags);
    }

    private void addSession(ClusterSession session) {
        int size;
        this.sessionByIdMap.put(session.id(), (Object)session);
        int addIndex = size = this.sessions.size();
        for (int i = size - 1; i >= 0; --i) {
            if (this.sessions.get(i).id() >= session.id()) continue;
            addIndex = i + 1;
            break;
        }
        if (size == addIndex) {
            this.sessions.add(session);
        } else {
            this.sessions.add(addIndex, session);
        }
    }

    private static void removeSession(ArrayList<ClusterSession> sessions, long clusterSessionId) {
        int size = sessions.size();
        for (int i = 0; i < size; ++i) {
            if (sessions.get(i).id() != clusterSessionId) continue;
            sessions.remove(i);
            break;
        }
    }

    public String toString() {
        return "ConsensusModuleAgent{election=" + this.election + '}';
    }
}

