/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.client.RecordingSignalPoller;
import io.aeron.archive.client.ReplicationParams;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.RecordingSignal;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.ClusterClientSession;
import io.aeron.cluster.ClusterControl;
import io.aeron.cluster.ClusterMember;
import io.aeron.cluster.ClusterSession;
import io.aeron.cluster.ClusterSessionProxy;
import io.aeron.cluster.ClusterTermination;
import io.aeron.cluster.ConsensusAdapter;
import io.aeron.cluster.ConsensusControlState;
import io.aeron.cluster.ConsensusModule;
import io.aeron.cluster.ConsensusModuleAdapter;
import io.aeron.cluster.ConsensusModuleControl;
import io.aeron.cluster.ConsensusModuleExtension;
import io.aeron.cluster.ConsensusModuleSnapshotAdapter;
import io.aeron.cluster.ConsensusModuleSnapshotListener;
import io.aeron.cluster.ConsensusModuleSnapshotTaker;
import io.aeron.cluster.ConsensusModuleStateExport;
import io.aeron.cluster.ConsensusPublisher;
import io.aeron.cluster.EgressPublisher;
import io.aeron.cluster.Election;
import io.aeron.cluster.IngressAdapter;
import io.aeron.cluster.LogAdapter;
import io.aeron.cluster.LogPublisher;
import io.aeron.cluster.LogReplay;
import io.aeron.cluster.NodeControl;
import io.aeron.cluster.PendingServiceMessageTracker;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.RecordingReplication;
import io.aeron.cluster.ServiceAck;
import io.aeron.cluster.ServiceProxy;
import io.aeron.cluster.StandbySnapshotEntry;
import io.aeron.cluster.StandbySnapshotReplicator;
import io.aeron.cluster.TimerService;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.client.ClusterEvent;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.AdminRequestType;
import io.aeron.cluster.codecs.AdminResponseCode;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.ClusterAction;
import io.aeron.cluster.codecs.EventCode;
import io.aeron.cluster.codecs.MessageHeaderDecoder;
import io.aeron.cluster.codecs.SessionMessageHeaderDecoder;
import io.aeron.cluster.service.Cluster;
import io.aeron.cluster.service.ClusterClock;
import io.aeron.cluster.service.ClusterMarkFile;
import io.aeron.cluster.service.ClusterTerminationException;
import io.aeron.cluster.service.ClusteredServiceContainer;
import io.aeron.cluster.service.RecoveryState;
import io.aeron.cluster.service.SnapshotDurationTracker;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.driver.media.UdpChannel;
import io.aeron.exceptions.AeronException;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import io.aeron.security.Authenticator;
import io.aeron.security.AuthorisationService;
import io.aeron.status.LocalSocketAddressStatus;
import io.aeron.status.ReadableCounter;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.LongConsumer;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableRingBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.SemanticVersion;
import org.agrona.Strings;
import org.agrona.collections.ArrayListUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongCounterMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.LongArrayQueue;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.status.CountersReader;

final class ConsensusModuleAgent
implements Agent,
IdleStrategy,
TimerService.TimerHandler,
ConsensusModuleSnapshotListener,
ConsensusModuleControl {
    static final long SLOW_TICK_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(10L);
    static final short APPEND_POSITION_FLAG_NONE = 0;
    static final short APPEND_POSITION_FLAG_CATCHUP = 1;
    private final long sessionTimeoutNs;
    private final long leaderHeartbeatIntervalNs;
    private final long leaderHeartbeatTimeoutNs;
    private long unavailableCounterHandlerRegistrationId;
    private long nextSessionId = 1L;
    private long leadershipTermId = -1L;
    private long expectedAckPosition = 0L;
    private long serviceAckId = 0L;
    private long terminationPosition = -1L;
    private long notifiedCommitPosition = 0L;
    private long lastAppendPosition = -1L;
    private long timeOfLastLogUpdateNs = 0L;
    private long timeOfLastAppendPositionUpdateNs = 0L;
    private long timeOfLastAppendPositionSendNs = 0L;
    private long timeOfLastLeaderUpdateNs;
    private long slowTickDeadlineNs = 0L;
    private long markFileUpdateDeadlineNs = 0L;
    private final ClusterMember[] activeMembers;
    private final ClusterMember thisMember;
    private final long[] rankedPositions;
    private final long[] serviceClientIds;
    private final int serviceCount;
    private final int memberId;
    private final Counter commitPosition;
    private long logPublicationChannelTag;
    private ReadableCounter appendPosition = null;
    private ConsensusModule.State state = ConsensusModule.State.INIT;
    private Cluster.Role role = Cluster.Role.FOLLOWER;
    private ClusterMember leaderMember;
    private final ArrayDeque<ServiceAck>[] serviceAckQueues;
    private final Counter clusterRoleCounter;
    private final ClusterMarkFile markFile;
    private final AgentInvoker aeronClientInvoker;
    private final ClusterClock clusterClock;
    private final LongConsumer clusterTimeConsumer;
    private final TimeUnit clusterTimeUnit;
    private final TimerService timerService;
    private final Counter moduleState;
    private final Counter controlToggle;
    private final Counter nodeControlToggle;
    private final ConsensusModuleAdapter consensusModuleAdapter;
    private final ServiceProxy serviceProxy;
    private final IngressAdapter ingressAdapter;
    private final EgressPublisher egressPublisher;
    private final LogPublisher logPublisher;
    private final LogAdapter logAdapter;
    private final ConsensusAdapter consensusAdapter;
    private final ConsensusPublisher consensusPublisher = new ConsensusPublisher();
    private final Long2ObjectHashMap<ClusterSession> sessionByIdMap = new Long2ObjectHashMap();
    private final ArrayList<ClusterSession> sessions = new ArrayList();
    private final ArrayList<ClusterSession> pendingUserSessions = new ArrayList();
    private final ArrayList<ClusterSession> rejectedUserSessions = new ArrayList();
    private final ArrayList<ClusterSession> redirectUserSessions = new ArrayList();
    private final ArrayList<ClusterSession> pendingBackupSessions = new ArrayList();
    private final ArrayList<ClusterSession> rejectedBackupSessions = new ArrayList();
    private final Int2ObjectHashMap<ClusterMember> clusterMemberByIdMap = new Int2ObjectHashMap();
    private final Long2LongCounterMap expiredTimerCountByCorrelationIdMap = new Long2LongCounterMap(0L);
    private final ArrayDeque<ClusterSession> uncommittedClosedSessions = new ArrayDeque();
    private final LongArrayQueue uncommittedTimers = new LongArrayQueue(Long.MAX_VALUE);
    private final PendingServiceMessageTracker[] pendingServiceMessageTrackers;
    private final ConsensusModuleExtension consensusModuleExtension;
    private final Authenticator authenticator;
    private final AuthorisationService authorisationService;
    private final ClusterSessionProxy sessionProxy;
    private final Aeron aeron;
    private final ConsensusModule.Context ctx;
    private final IdleStrategy idleStrategy;
    private final RecordingLog recordingLog;
    private final DutyCycleTracker dutyCycleTracker;
    private final SnapshotDurationTracker totalSnapshotDurationTracker;
    private final ChannelUri responseChannelTemplate;
    private RecordingLog.RecoveryPlan recoveryPlan;
    private AeronArchive archive;
    private AeronArchive extensionArchive;
    private RecordingSignalPoller recordingSignalPoller;
    private Election election;
    private ClusterTermination clusterTermination;
    private long logSubscriptionId = -1L;
    private long logRecordingId = -1L;
    private long logRecordingStopPosition = 0L;
    private String liveLogDestination;
    private String catchupLogDestination;
    private String ingressEndpoints;
    private StandbySnapshotReplicator standbySnapshotReplicator = null;
    private String localLogChannel;

    ConsensusModuleAgent(ConsensusModule.Context ctx) {
        this.ctx = ctx;
        this.aeron = ctx.aeron();
        this.clusterClock = ctx.clusterClock();
        this.clusterTimeUnit = this.clusterClock.timeUnit();
        this.clusterTimeConsumer = ctx.clusterTimeConsumerSupplier().apply(ctx);
        this.timerService = ctx.timerServiceSupplier().newInstance(this.clusterTimeUnit, this);
        this.sessionTimeoutNs = ctx.sessionTimeoutNs();
        this.leaderHeartbeatIntervalNs = ctx.leaderHeartbeatIntervalNs();
        this.leaderHeartbeatTimeoutNs = ctx.leaderHeartbeatTimeoutNs();
        this.egressPublisher = ctx.egressPublisher();
        this.moduleState = ctx.moduleStateCounter();
        this.commitPosition = ctx.commitPositionCounter();
        this.controlToggle = ctx.controlToggleCounter();
        this.nodeControlToggle = ctx.nodeControlToggleCounter();
        this.logPublisher = ctx.logPublisher();
        this.idleStrategy = ctx.idleStrategy();
        this.activeMembers = ClusterMember.parse(ctx.clusterMembers());
        this.sessionProxy = new ClusterSessionProxy(this.egressPublisher);
        this.memberId = ctx.clusterMemberId();
        this.clusterRoleCounter = ctx.clusterNodeRoleCounter();
        this.markFile = ctx.clusterMarkFile();
        this.recordingLog = ctx.recordingLog();
        this.serviceClientIds = new long[ctx.serviceCount()];
        Arrays.fill(this.serviceClientIds, -1L);
        this.serviceCount = ctx.serviceCount();
        this.serviceAckQueues = ServiceAck.newArrayOfQueues(this.serviceCount);
        this.dutyCycleTracker = ctx.dutyCycleTracker();
        this.totalSnapshotDurationTracker = ctx.totalSnapshotDurationTracker();
        this.aeronClientInvoker = this.aeron.conductorAgentInvoker();
        this.aeronClientInvoker.invoke();
        this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
        this.role(Cluster.Role.FOLLOWER);
        ClusterMember.addClusterMemberIds(this.activeMembers, this.clusterMemberByIdMap);
        this.leaderMember = this.thisMember = ClusterMember.determineMember(this.activeMembers, ctx.clusterMemberId(), ctx.memberEndpoints());
        ChannelUri consensusUri = ChannelUri.parse(ctx.consensusChannel());
        if (!consensusUri.containsKey("endpoint")) {
            consensusUri.put("endpoint", this.thisMember.consensusEndpoint());
        }
        this.consensusAdapter = new ConsensusAdapter(this.aeron.addSubscription(consensusUri.toString(), ctx.consensusStreamId()), this);
        this.ingressAdapter = new IngressAdapter(ctx.ingressFragmentLimit(), this);
        this.logAdapter = new LogAdapter(this, ctx.logFragmentLimit());
        this.consensusModuleAdapter = new ConsensusModuleAdapter(this.aeron.addSubscription(ctx.controlChannel(), ctx.consensusModuleStreamId()), this);
        this.serviceProxy = new ServiceProxy(this.aeron.addPublication(ctx.controlChannel(), ctx.serviceStreamId()));
        this.authenticator = (Authenticator)ctx.authenticatorSupplier().get();
        this.authorisationService = (AuthorisationService)ctx.authorisationServiceSupplier().get();
        this.pendingServiceMessageTrackers = new PendingServiceMessageTracker[ctx.serviceCount()];
        int size = ctx.serviceCount();
        for (int i = 0; i < size; ++i) {
            this.pendingServiceMessageTrackers[i] = new PendingServiceMessageTracker(i, this.commitPosition, this.logPublisher, this.clusterClock);
        }
        this.consensusModuleExtension = ctx.consensusModuleExtension();
        this.responseChannelTemplate = Strings.isEmpty(ctx.egressChannel()) ? null : ChannelUri.parse(ctx.egressChannel());
    }

    @Override
    public void onClose() {
        if (!this.aeron.isClosed()) {
            this.aeron.removeUnavailableCounterHandler(this.unavailableCounterHandlerRegistrationId);
            CloseHelper.close(this.consensusModuleExtension);
            CountedErrorHandler errorHandler = this.ctx.countedErrorHandler();
            this.logPublisher.disconnect(errorHandler);
            CloseHelper.close(this.logAdapter.subscription());
            this.tryStopLogRecording();
            if (!this.ctx.ownsAeronClient()) {
                ClusterMember.closeConsensusPublications(errorHandler, this.activeMembers);
                CloseHelper.close(errorHandler, this.ingressAdapter);
                CloseHelper.close(errorHandler, this.consensusAdapter);
                CloseHelper.close(errorHandler, this.serviceProxy);
                CloseHelper.close(errorHandler, this.consensusModuleAdapter);
                CloseHelper.close(errorHandler, this.archive);
                for (ClusterSession session : this.sessionByIdMap.values()) {
                    session.close(this.aeron, errorHandler);
                }
            }
            this.state(ConsensusModule.State.CLOSED);
        }
        this.markFile.updateActivityTimestamp(-1L);
        this.markFile.force();
        this.ctx.close();
    }

    @Override
    public void onStart() {
        this.archive = AeronArchive.connect(this.ctx.archiveContext().clone());
        this.recordingSignalPoller = new RecordingSignalPoller(this.archive.controlSessionId(), this.archive.controlResponsePoller().subscription());
        long lastTermRecordingId = this.recordingLog.findLastTermRecordingId();
        if (-1L != lastTermRecordingId) {
            this.archive.tryStopRecordingByIdentity(lastTermRecordingId);
        }
        if (null == this.ctx.boostrapState()) {
            this.replicateStandbySnapshotsForStartup();
            this.recoveryPlan = this.recoverFromSnapshotAndLog();
        } else {
            this.recoveryPlan = this.recoverFromBootstrapState();
        }
        ClusterMember.addConsensusPublications(this.activeMembers, this.thisMember, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, this.ctx.countedErrorHandler());
        long lastLeadershipTermId = this.recoveryPlan.lastLeadershipTermId;
        long commitPosition = this.commitPosition.getWeak();
        long appendedPosition = this.recoveryPlan.appendedLogPosition;
        ConsensusModuleAgent.logNewElection(this.memberId, lastLeadershipTermId, commitPosition, appendedPosition, "node started");
        this.election = new Election(true, -1, lastLeadershipTermId, this.recoveryPlan.lastTermBaseLogPosition, commitPosition, appendedPosition, this.activeMembers, this.clusterMemberByIdMap, this.thisMember, this.consensusPublisher, this.ctx, this);
        this.election.doWork(this.clusterClock.timeNanos());
        this.state(ConsensusModule.State.ACTIVE);
        if (null != this.consensusModuleExtension) {
            this.extensionArchive = AeronArchive.connect(this.ctx.archiveContext().clone());
        }
        this.unavailableCounterHandlerRegistrationId = this.aeron.addUnavailableCounterHandler(this::onUnavailableCounter);
        this.dutyCycleTracker.update(this.clusterClock.timeNanos());
    }

    @Override
    public int doWork() {
        long timestamp = this.clusterClock.time();
        long nowNs = this.clusterTimeUnit.toNanos(timestamp);
        int workCount = 0;
        this.dutyCycleTracker.measureAndUpdate(nowNs);
        try {
            if (nowNs >= this.slowTickDeadlineNs) {
                int slowTickWorkCount = this.slowTickWork(nowNs);
                workCount += slowTickWorkCount;
                this.slowTickDeadlineNs = slowTickWorkCount > 0 ? nowNs + 1L : nowNs + SLOW_TICK_INTERVAL_NS;
            }
            workCount += this.consensusAdapter.poll();
            workCount = null != this.election ? (workCount += this.election.doWork(nowNs)) : (workCount += this.consensusWork(timestamp, nowNs));
        }
        catch (AgentTerminationException ex) {
            this.runTerminationHook();
            throw ex;
        }
        catch (Exception ex) {
            if (null != this.election) {
                this.election.handleError(nowNs, ex);
            }
            throw ex;
        }
        this.clusterTimeConsumer.accept(timestamp);
        return workCount;
    }

    @Override
    public ConsensusModule.Context context() {
        return this.ctx;
    }

    @Override
    public int memberId() {
        return this.memberId;
    }

    @Override
    public String roleName() {
        return this.ctx.agentRoleName();
    }

    @Override
    public long time() {
        return this.clusterClock.time();
    }

    @Override
    public TimeUnit timeUnit() {
        return this.clusterClock.timeUnit();
    }

    @Override
    public IdleStrategy idleStrategy() {
        return this;
    }

    @Override
    public void idle() {
        ConsensusModuleAgent.checkInterruptStatus();
        this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException("unexpected Aeron close");
        }
        this.idleStrategy.idle();
        this.pollArchiveEvents();
    }

    @Override
    public void idle(int workCount) {
        ConsensusModuleAgent.checkInterruptStatus();
        this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException("unexpected Aeron close");
        }
        this.idleStrategy.idle(workCount);
        if (0 == workCount) {
            this.pollArchiveEvents();
        }
    }

    @Override
    public void reset() {
        this.idleStrategy.reset();
    }

    @Override
    public Aeron aeron() {
        return this.aeron;
    }

    @Override
    public AeronArchive archive() {
        return this.extensionArchive;
    }

    @Override
    public AuthorisationService authorisationService() {
        return this.authorisationService;
    }

    @Override
    public ClusterClientSession getClientSession(long clusterSessionId) {
        return this.sessionByIdMap.get(clusterSessionId);
    }

    @Override
    public void closeClusterSession(long clusterSessionId) {
        this.onServiceCloseSession(clusterSessionId);
    }

    @Override
    public void onLoadBeginSnapshot(int appVersion, TimeUnit timeUnit, DirectBuffer buffer, int offset, int length) {
        if (!this.ctx.appVersionValidator().isVersionCompatible(this.ctx.appVersion(), appVersion)) {
            throw new ClusterException("incompatible version: " + SemanticVersion.toString(this.ctx.appVersion()) + " snapshot=" + SemanticVersion.toString(appVersion), AeronException.Category.FATAL);
        }
        if (timeUnit != this.clusterTimeUnit) {
            throw new ClusterException("incompatible time unit: " + this.clusterTimeUnit + " snapshot=" + timeUnit, AeronException.Category.FATAL);
        }
    }

    public ControlledFragmentHandler.Action onExtensionMessage(int actingBlockLength, int templateId, int schemaId, int actingVersion, DirectBuffer buffer, int offset, int length, Header header) {
        if (null != this.consensusModuleExtension) {
            return this.consensusModuleExtension.onIngressExtensionMessage(actingBlockLength, templateId, schemaId, actingVersion, buffer, offset, length, header);
        }
        this.ctx.countedErrorHandler().onError(new ClusterEvent("expected schemaId=111, actual=" + schemaId));
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public void onLoadEndSnapshot(DirectBuffer buffer, int offset, int length) {
    }

    @Override
    public void onLoadClusterSession(long clusterSessionId, long correlationId, long openedPosition, long timeOfLastActivity, CloseReason closeReason, int responseStreamId, String responseChannel, DirectBuffer buffer, int offset, int length) {
        ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, this.refineResponseChannel(responseChannel));
        session.loadSnapshotState(correlationId, openedPosition, timeOfLastActivity, closeReason);
        this.addSession(session);
        if (clusterSessionId >= this.nextSessionId) {
            this.nextSessionId = clusterSessionId + 1L;
        }
    }

    @Override
    public void onLoadConsensusModuleState(long nextSessionId, long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, DirectBuffer buffer, int offset, int length) {
        this.nextSessionId = nextSessionId;
        this.pendingServiceMessageTrackers[0].loadState(nextServiceSessionId, logServiceSessionId, pendingMessageCapacity);
    }

    @Override
    public void onLoadPendingMessageTracker(long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, int serviceId, DirectBuffer buffer, int offset, int length) {
        if (serviceId < 0 || serviceId >= this.pendingServiceMessageTrackers.length) {
            throw new ClusterException("serviceId=" + serviceId + " invalid for serviceCount=" + this.pendingServiceMessageTrackers.length);
        }
        this.pendingServiceMessageTrackers[serviceId].loadState(nextServiceSessionId, logServiceSessionId, pendingMessageCapacity);
    }

    @Override
    public void onLoadPendingMessage(long clusterSessionId, DirectBuffer buffer, int offset, int length) {
        int index = PendingServiceMessageTracker.serviceId(clusterSessionId);
        this.pendingServiceMessageTrackers[index].appendMessage(buffer, offset, length);
    }

    @Override
    public void onLoadTimer(long correlationId, long deadline, DirectBuffer buffer, int offset, int length) {
        this.onScheduleTimer(correlationId, deadline);
    }

    public void onSessionConnect(long correlationId, int responseStreamId, int version, String responseChannel, byte[] encodedCredentials) {
        long l;
        if (Cluster.Role.LEADER == this.role) {
            long l2 = this.nextSessionId;
            l = l2;
            this.nextSessionId = l2 + 1L;
        } else {
            l = -1L;
        }
        long clusterSessionId = l;
        ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, this.refineResponseChannel(responseChannel));
        session.asyncConnect(this.aeron);
        long now = this.clusterClock.time();
        session.lastActivityNs(this.clusterTimeUnit.toNanos(now), correlationId);
        if (Cluster.Role.LEADER != this.role) {
            this.redirectUserSessions.add(session);
        } else if (0 != SemanticVersion.major(version)) {
            String detail = "invalid client version " + SemanticVersion.toString(version) + ", cluster is " + SemanticVersion.toString(AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION);
            session.reject(EventCode.ERROR, detail, this.ctx.errorLog(), ConsensusModule.Configuration.clusterMemberId());
            this.rejectedUserSessions.add(session);
        } else if (this.pendingUserSessions.size() + this.sessions.size() >= this.ctx.maxConcurrentSessions()) {
            session.reject(EventCode.ERROR, "concurrent session limit", this.ctx.errorLog(), ConsensusModule.Configuration.clusterMemberId());
            this.rejectedUserSessions.add(session);
        } else {
            this.authenticator.onConnectRequest(session.id(), encodedCredentials, this.clusterTimeUnit.toMillis(now));
            this.pendingUserSessions.add(session);
        }
    }

    void onSessionClose(long leadershipTermId, long clusterSessionId) {
        ClusterSession session;
        if (leadershipTermId == this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (session = this.sessionByIdMap.get(clusterSessionId)) && session.isOpen()) {
            session.closing(CloseReason.CLIENT_ACTION);
            session.disconnect(this.aeron, this.ctx.countedErrorHandler());
            if (this.logPublisher.appendSessionClose(this.memberId, session, leadershipTermId, this.clusterClock.time(), this.clusterClock.timeUnit())) {
                session.closedLogPosition(this.logPublisher.position());
                this.uncommittedClosedSessions.addLast(session);
                this.closeSession(session);
            }
        }
    }

    void onAdminRequest(long leadershipTermId, long clusterSessionId, long correlationId, AdminRequestType requestType, DirectBuffer payload, int payloadOffset, int payloadLength) {
        if (Cluster.Role.LEADER != this.role || leadershipTermId != this.leadershipTermId) {
            return;
        }
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (null == session || session.state() != ClusterSession.State.OPEN) {
            return;
        }
        if (!this.authorisationService.isAuthorised(111, 26, (Object)requestType, session.encodedPrincipal())) {
            String msg = "Execution of the " + requestType + " request was not authorised";
            this.egressPublisher.sendAdminResponse(session, correlationId, requestType, AdminResponseCode.UNAUTHORISED_ACCESS, msg);
            return;
        }
        if (AdminRequestType.SNAPSHOT == requestType) {
            if (ClusterControl.ToggleState.SNAPSHOT.toggle(this.controlToggle)) {
                this.egressPublisher.sendAdminResponse(session, correlationId, requestType, AdminResponseCode.OK, "");
            } else {
                String msg = "Failed to switch ClusterControl to the ToggleState.SNAPSHOT state";
                this.egressPublisher.sendAdminResponse(session, correlationId, requestType, AdminResponseCode.ERROR, "Failed to switch ClusterControl to the ToggleState.SNAPSHOT state");
            }
        } else {
            this.egressPublisher.sendAdminResponse(session, correlationId, requestType, AdminResponseCode.ERROR, "Unknown request type: " + requestType);
        }
    }

    ControlledFragmentHandler.Action onIngressMessage(long leadershipTermId, long clusterSessionId, DirectBuffer buffer, int offset, int length) {
        ClusterSession session;
        if (leadershipTermId == this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (session = this.sessionByIdMap.get(clusterSessionId)) && session.isOpen()) {
            long timestamp = this.clusterClock.time();
            if (this.logPublisher.appendMessage(leadershipTermId, clusterSessionId, timestamp, buffer, offset, length) > 0L) {
                session.timeOfLastActivityNs(this.clusterTimeUnit.toNanos(timestamp));
            } else {
                return ControlledFragmentHandler.Action.ABORT;
            }
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    void onSessionKeepAlive(long leadershipTermId, long clusterSessionId) {
        ClusterSession session;
        if (leadershipTermId == this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (session = this.sessionByIdMap.get(clusterSessionId)) && session.state() == ClusterSession.State.OPEN) {
            session.timeOfLastActivityNs(this.clusterClock.timeNanos());
        }
    }

    void onIngressChallengeResponse(long correlationId, long clusterSessionId, byte[] encodedCredentials) {
        if (Cluster.Role.LEADER == this.role) {
            this.onChallengeResponseForSession(this.pendingUserSessions, correlationId, clusterSessionId, encodedCredentials);
        } else {
            this.consensusPublisher.challengeResponse(this.leaderMember.publication(), correlationId, clusterSessionId, encodedCredentials);
        }
    }

    void onConsensusChallengeResponse(long correlationId, long clusterSessionId, byte[] encodedCredentials) {
        this.onChallengeResponseForSession(this.pendingBackupSessions, correlationId, clusterSessionId, encodedCredentials);
    }

    private void onChallengeResponseForSession(ArrayList<ClusterSession> pendingSessions, long correlationId, long clusterSessionId, byte[] encodedCredentials) {
        int lastIndex;
        for (int i = lastIndex = pendingSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = pendingSessions.get(i);
            if (session.id() != clusterSessionId || session.state() != ClusterSession.State.CHALLENGED) continue;
            long timestamp = this.clusterClock.time();
            long nowMs = this.clusterTimeUnit.toMillis(timestamp);
            session.lastActivityNs(this.clusterTimeUnit.toNanos(timestamp), correlationId);
            this.authenticator.onChallengeResponse(clusterSessionId, encodedCredentials, nowMs);
            break;
        }
    }

    @Override
    public boolean onTimerEvent(long correlationId) {
        long appendPosition = this.logPublisher.appendTimer(correlationId, this.leadershipTermId, this.clusterClock.time());
        if (appendPosition > 0L) {
            this.uncommittedTimers.offerLong(appendPosition);
            this.uncommittedTimers.offerLong(correlationId);
            return true;
        }
        return false;
    }

    void onCanvassPosition(long logLeadershipTermId, long logPosition, long leadershipTermId, int followerMemberId, int protocolVersion) {
        ClusterMember follower;
        ConsensusModuleAgent.logOnCanvassPosition(this.memberId, logLeadershipTermId, logPosition, leadershipTermId, followerMemberId, protocolVersion);
        this.checkFollowerForConsensusPublication(followerMemberId);
        if (null != this.election) {
            this.election.onCanvassPosition(logLeadershipTermId, logPosition, leadershipTermId, followerMemberId, protocolVersion);
        } else if (Cluster.Role.LEADER == this.role && null != (follower = this.clusterMemberByIdMap.get(followerMemberId)) && logLeadershipTermId <= this.leadershipTermId) {
            this.stopExistingCatchupReplay(follower);
            RecordingLog.Entry currentTermEntry = this.recordingLog.getTermEntry(this.leadershipTermId);
            long termBaseLogPosition = currentTermEntry.termBaseLogPosition;
            long nextLogLeadershipTermId = -1L;
            long nextTermBaseLogPosition = -1L;
            long nextLogPosition = -1L;
            if (logLeadershipTermId < this.leadershipTermId) {
                RecordingLog.Entry nextLogEntry = this.recordingLog.findTermEntry(logLeadershipTermId + 1L);
                nextLogLeadershipTermId = null != nextLogEntry ? nextLogEntry.leadershipTermId : this.leadershipTermId;
                nextTermBaseLogPosition = null != nextLogEntry ? nextLogEntry.termBaseLogPosition : termBaseLogPosition;
                nextLogPosition = null != nextLogEntry ? nextLogEntry.logPosition : -1L;
            }
            this.consensusPublisher.newLeadershipTerm(follower.publication(), logLeadershipTermId, nextLogLeadershipTermId, nextTermBaseLogPosition, nextLogPosition, this.leadershipTermId, termBaseLogPosition, this.logPublisher.position(), this.logRecordingId, this.clusterClock.time(), this.memberId, this.logPublisher.sessionId(), false);
        }
    }

    void onRequestVote(long logLeadershipTermId, long logPosition, long candidateTermId, int candidateId, int protocolVersion) {
        ConsensusModuleAgent.logOnRequestVote(this.memberId, logLeadershipTermId, logPosition, candidateTermId, candidateId, protocolVersion);
        if (null != this.election) {
            this.election.onRequestVote(logLeadershipTermId, logPosition, candidateTermId, candidateId, protocolVersion);
        } else if (candidateTermId > this.leadershipTermId) {
            this.enterElection(false, "unexpected vote request");
        }
    }

    void onVote(long candidateTermId, long logLeadershipTermId, long logPosition, int candidateMemberId, int followerMemberId, boolean vote) {
        if (null != this.election) {
            this.election.onVote(candidateTermId, logLeadershipTermId, logPosition, candidateMemberId, followerMemberId, vote);
        }
    }

    void onNewLeadershipTerm(long logLeadershipTermId, long nextLeadershipTermId, long nextTermBaseLogPosition, long nextLogPosition, long leadershipTermId, long termBaseLogPosition, long logPosition, long leaderRecordingId, long timestamp, int leaderId, int logSessionId, int appVersion, boolean isStartup) {
        ConsensusModuleAgent.logOnNewLeadershipTerm(this.memberId, logLeadershipTermId, nextLeadershipTermId, nextTermBaseLogPosition, nextLogPosition, leadershipTermId, termBaseLogPosition, logPosition, leaderRecordingId, timestamp, leaderId, logSessionId, appVersion, isStartup);
        if (!this.ctx.appVersionValidator().isVersionCompatible(this.ctx.appVersion(), appVersion)) {
            this.ctx.countedErrorHandler().onError(new ClusterException("incompatible version: " + SemanticVersion.toString(this.ctx.appVersion()) + " log=" + SemanticVersion.toString(appVersion)));
            this.unexpectedTermination();
        }
        long nowNs = this.clusterClock.timeNanos();
        if (leadershipTermId >= this.leadershipTermId) {
            this.timeOfLastLeaderUpdateNs = nowNs;
        }
        if (null != this.election) {
            this.election.onNewLeadershipTerm(logLeadershipTermId, nextLeadershipTermId, nextTermBaseLogPosition, nextLogPosition, leadershipTermId, termBaseLogPosition, logPosition, leaderRecordingId, timestamp, leaderId, logSessionId, isStartup);
        } else if (Cluster.Role.FOLLOWER == this.role && leadershipTermId == this.leadershipTermId && leaderId == this.leaderMember.id()) {
            this.notifiedCommitPosition = Math.max(this.notifiedCommitPosition, logPosition);
            this.timeOfLastLogUpdateNs = nowNs;
        } else if (leadershipTermId > this.leadershipTermId) {
            this.enterElection(false, "unexpected new leadership term event");
        }
    }

    void onAppendPosition(long leadershipTermId, long logPosition, int followerMemberId, short flags) {
        ClusterMember follower;
        ConsensusModuleAgent.logOnAppendPosition(this.memberId, leadershipTermId, logPosition, followerMemberId, flags);
        if (null != this.election) {
            this.election.onAppendPosition(leadershipTermId, logPosition, followerMemberId, flags);
        } else if (leadershipTermId <= this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (follower = this.clusterMemberByIdMap.get(followerMemberId))) {
            follower.logPosition(logPosition).timeOfLastAppendPositionNs(this.clusterClock.timeNanos());
            this.trackCatchupCompletion(follower, leadershipTermId, flags);
        }
    }

    void onCommitPosition(long leadershipTermId, long logPosition, int leaderMemberId) {
        ConsensusModuleAgent.logOnCommitPosition(this.memberId, leadershipTermId, logPosition, leaderMemberId);
        long nowNs = this.clusterClock.timeNanos();
        if (leadershipTermId >= this.leadershipTermId) {
            this.timeOfLastLeaderUpdateNs = nowNs;
        }
        if (null != this.election) {
            this.election.onCommitPosition(leadershipTermId, logPosition, leaderMemberId);
        } else if (leadershipTermId == this.leadershipTermId && leaderMemberId == this.leaderMember.id() && Cluster.Role.FOLLOWER == this.role) {
            this.notifiedCommitPosition = logPosition;
            this.timeOfLastLogUpdateNs = nowNs;
        } else if (leadershipTermId > this.leadershipTermId) {
            this.enterElection(false, "unexpected commit position from new leader");
        }
    }

    void onCatchupPosition(long leadershipTermId, long logPosition, int followerMemberId, String catchupEndpoint) {
        ClusterMember follower;
        ConsensusModuleAgent.logOnCatchupPosition(this.memberId, leadershipTermId, logPosition, followerMemberId, catchupEndpoint);
        if (leadershipTermId <= this.leadershipTermId && Cluster.Role.LEADER == this.role && null != (follower = this.clusterMemberByIdMap.get(followerMemberId)) && follower.catchupReplaySessionId() == -1L) {
            ChannelUri channel = ChannelUri.parse(this.ctx.followerCatchupChannel());
            channel.put("endpoint", catchupEndpoint);
            channel.put("session-id", Integer.toString(this.logPublisher.sessionId()));
            channel.put("linger", "0");
            channel.put("eos", "false");
            follower.catchupReplaySessionId(this.archive.startReplay(this.logRecordingId, logPosition, Long.MAX_VALUE, channel.toString(), this.ctx.logStreamId()));
            follower.catchupReplayCorrelationId(this.archive.lastCorrelationId());
        }
    }

    void onStopCatchup(long leadershipTermId, int followerMemberId) {
        ConsensusModuleAgent.logOnStopCatchup(this.memberId, leadershipTermId, followerMemberId);
        if (leadershipTermId == this.leadershipTermId && followerMemberId == this.memberId && null != this.catchupLogDestination) {
            this.logAdapter.asyncRemoveDestination(this.catchupLogDestination);
            this.catchupLogDestination = null;
        }
    }

    void onTerminationPosition(long leadershipTermId, long logPosition) {
        ConsensusModuleAgent.logOnTerminationPosition(this.memberId, leadershipTermId, logPosition);
        if (leadershipTermId == this.leadershipTermId && Cluster.Role.FOLLOWER == this.role) {
            this.terminationPosition = logPosition;
            this.timeOfLastLogUpdateNs = this.clusterClock.timeNanos();
        }
    }

    void onTerminationAck(long leadershipTermId, long logPosition, int memberId) {
        ClusterMember member;
        ConsensusModuleAgent.logOnTerminationAck(this.memberId, leadershipTermId, logPosition, memberId);
        if (leadershipTermId == this.leadershipTermId && logPosition >= this.terminationPosition && Cluster.Role.LEADER == this.role && null != (member = this.clusterMemberByIdMap.get(memberId))) {
            member.hasTerminated(true);
            if (this.clusterTermination.canTerminate(this.activeMembers, this.clusterClock.timeNanos())) {
                this.recordingLog.commitLogPosition(leadershipTermId, this.terminationPosition);
                this.closeAndTerminate();
            }
        }
    }

    void onBackupQuery(long correlationId, int responseStreamId, int version, String responseChannel, byte[] encodedCredentials) {
        if (null == this.election && (this.state == ConsensusModule.State.ACTIVE || this.state == ConsensusModule.State.SUSPENDED)) {
            ClusterSession session = new ClusterSession(-1L, responseStreamId, this.refineResponseChannel(responseChannel));
            long timestamp = this.clusterClock.time();
            session.action(ClusterSession.Action.BACKUP);
            session.asyncConnect(this.aeron);
            session.lastActivityNs(this.clusterTimeUnit.toNanos(timestamp), correlationId);
            if (0 == SemanticVersion.major(version)) {
                long timestampMs = this.clusterTimeUnit.toMillis(timestamp);
                this.authenticator.onConnectRequest(session.id(), encodedCredentials, timestampMs);
                this.pendingBackupSessions.add(session);
            } else {
                String detail = "invalid client version " + SemanticVersion.toString(version) + ", cluster=" + SemanticVersion.toString(AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION);
                session.reject(EventCode.ERROR, detail, this.ctx.errorLog(), ConsensusModule.Configuration.clusterMemberId());
                this.rejectedBackupSessions.add(session);
            }
        }
    }

    public void onHeartbeatRequest(long correlationId, int responseStreamId, String responseChannel, byte[] encodedCredentials) {
        if (null == this.election && (this.state == ConsensusModule.State.ACTIVE || this.state == ConsensusModule.State.SUSPENDED)) {
            ClusterSession session = new ClusterSession(-1L, responseStreamId, this.refineResponseChannel(responseChannel));
            session.action(ClusterSession.Action.HEARTBEAT);
            session.asyncConnect(this.aeron);
            long timestamp = this.clusterClock.time();
            long timestampNs = this.clusterTimeUnit.toNanos(timestamp);
            long timestampMs = this.clusterTimeUnit.toMillis(timestamp);
            session.lastActivityNs(timestampNs, correlationId);
            this.authenticator.onConnectRequest(session.id(), encodedCredentials, timestampMs);
            this.pendingBackupSessions.add(session);
        }
    }

    void onClusterMembersQuery(long correlationId, boolean isExtendedRequest) {
        if (isExtendedRequest) {
            this.serviceProxy.clusterMembersExtendedResponse(correlationId, this.clusterClock.timeNanos(), this.leaderMember.id(), this.memberId, this.activeMembers);
        } else {
            this.serviceProxy.clusterMembersResponse(correlationId, this.leaderMember.id(), ClusterMember.encodeAsString(this.activeMembers));
        }
    }

    void onStandbySnapshot(long correlationId, int version, List<StandbySnapshotEntry> standbySnapshotEntries, int responseStreamId, String responseChannel, byte[] encodedCredentials) {
        if (null == this.election && (this.state == ConsensusModule.State.ACTIVE || this.state == ConsensusModule.State.SUSPENDED)) {
            ClusterSession session = new ClusterSession(-1L, responseStreamId, this.refineResponseChannel(responseChannel));
            long timestamp = this.clusterClock.time();
            session.action(ClusterSession.Action.STANDBY_SNAPSHOT);
            session.asyncConnect(this.aeron);
            session.lastActivityNs(this.clusterTimeUnit.toNanos(timestamp), correlationId);
            session.requestInput(standbySnapshotEntries);
            if (0 == SemanticVersion.major(version)) {
                long timestampMs = this.clusterTimeUnit.toMillis(timestamp);
                this.authenticator.onConnectRequest(session.id(), encodedCredentials, timestampMs);
                this.pendingBackupSessions.add(session);
            } else {
                String detail = "invalid client version " + SemanticVersion.toString(version) + ", cluster=" + SemanticVersion.toString(AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION);
                session.reject(EventCode.ERROR, detail, this.ctx.errorLog(), ConsensusModule.Configuration.clusterMemberId());
                this.rejectedBackupSessions.add(session);
            }
        }
    }

    void state(ConsensusModule.State newState) {
        if (newState != this.state) {
            this.logStateChange(this.memberId, this.state, newState);
            this.state = newState;
            if (!this.moduleState.isClosed()) {
                this.moduleState.set(newState.code());
            }
        }
    }

    ConsensusModule.State state() {
        return this.state;
    }

    private void logStateChange(int memberId, ConsensusModule.State oldState, ConsensusModule.State newState) {
    }

    void role(Cluster.Role newRole) {
        if (newRole != this.role) {
            this.logRoleChange(this.memberId, this.role, newRole);
            this.role = newRole;
            if (!this.clusterRoleCounter.isClosed()) {
                this.clusterRoleCounter.set(newRole.code());
            }
        }
    }

    private void logRoleChange(int memberId, Cluster.Role oldRole, Cluster.Role newRole) {
    }

    Cluster.Role role() {
        return this.role;
    }

    long prepareForNewLeadership(long logPosition, long nowNs) {
        this.role(Cluster.Role.FOLLOWER);
        CloseHelper.close(this.ctx.countedErrorHandler(), this.ingressAdapter);
        if (null != this.catchupLogDestination) {
            this.logAdapter.asyncRemoveDestination(this.catchupLogDestination);
            this.catchupLogDestination = null;
        }
        if (null != this.liveLogDestination) {
            this.logAdapter.asyncRemoveDestination(this.liveLogDestination);
            this.liveLogDestination = null;
        }
        long logSubscriptionRegistrationId = this.logAdapter.disconnect(this.ctx.countedErrorHandler());
        this.logPublisher.disconnect(this.ctx.countedErrorHandler());
        ClusterControl.ToggleState.deactivate(this.controlToggle);
        this.tryStopLogRecording();
        if (-1L != this.logRecordingId) {
            this.lastAppendPosition = this.getLastAppendedPosition();
            this.timeOfLastAppendPositionUpdateNs = nowNs;
            this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.serviceCount, this.logRecordingId);
            this.clearSessionsAfter(logPosition);
            int size = this.sessions.size();
            for (int i = 0; i < size; ++i) {
                this.sessions.get(i).disconnect(this.aeron, this.ctx.countedErrorHandler());
            }
            this.commitPosition.setOrdered(logPosition);
            this.restoreUncommittedEntries(logPosition);
            CountersReader counters = this.ctx.aeron().countersReader();
            long archiveId = this.archive.archiveId();
            while (-1 != RecordingPos.findCounterIdByRecording(counters, this.logRecordingId, archiveId)) {
                this.idle();
            }
        }
        if (-1L != logSubscriptionRegistrationId) {
            this.awaitLocalSocketsClosed(logSubscriptionRegistrationId);
        }
        if (null != this.consensusModuleExtension) {
            this.consensusModuleExtension.onPrepareForNewLeadership();
        }
        return this.lastAppendPosition;
    }

    void onServiceCloseSession(long clusterSessionId) {
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (null != session) {
            session.closing(CloseReason.SERVICE_ACTION);
            if (Cluster.Role.LEADER == this.role && ConsensusModule.State.ACTIVE == this.state && this.logPublisher.appendSessionClose(this.memberId, session, this.leadershipTermId, this.clusterClock.time(), this.clusterClock.timeUnit())) {
                String msg = CloseReason.SERVICE_ACTION.name();
                this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, msg);
                session.closedLogPosition(this.logPublisher.position());
                this.uncommittedClosedSessions.addLast(session);
                this.closeSession(session);
            }
        }
    }

    void onServiceMessage(long clusterSessionId, DirectBuffer buffer, int offset, int length) {
        int i = PendingServiceMessageTracker.serviceId(clusterSessionId);
        this.pendingServiceMessageTrackers[i].enqueueMessage((MutableDirectBuffer)buffer, offset, length);
    }

    void onScheduleTimer(long correlationId, long deadline) {
        if (this.expiredTimerCountByCorrelationIdMap.get(correlationId) == 0L) {
            this.timerService.scheduleTimerForCorrelationId(correlationId, deadline);
        } else {
            this.expiredTimerCountByCorrelationIdMap.decrementAndGet(correlationId);
        }
    }

    void onCancelTimer(long correlationId) {
        this.timerService.cancelTimerByCorrelationId(correlationId);
    }

    void onServiceAck(long logPosition, long timestamp, long ackId, long relevantId, int serviceId) {
        ConsensusModuleAgent.logOnServiceAck(this.memberId, logPosition, timestamp, this.clusterClock.timeUnit(), ackId, relevantId, serviceId);
        this.captureServiceAck(logPosition, ackId, relevantId, serviceId);
        if (ServiceAck.hasReached(logPosition, this.serviceAckId, this.serviceAckQueues)) {
            switch (this.state) {
                case SNAPSHOT: {
                    ++this.serviceAckId;
                    this.snapshotOnServiceAck(logPosition, timestamp, this.pollServiceAcks(logPosition, serviceId));
                    break;
                }
                case QUITTING: {
                    this.closeAndTerminate();
                    break;
                }
                case TERMINATING: {
                    this.terminateOnServiceAck(logPosition);
                    break;
                }
            }
        }
    }

    void onReplaySessionMessage(long clusterSessionId, long timestamp) {
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (null != session) {
            session.timeOfLastActivityNs(this.clusterTimeUnit.toNanos(timestamp));
        } else if (clusterSessionId < 0L) {
            int i = PendingServiceMessageTracker.serviceId(clusterSessionId);
            this.pendingServiceMessageTrackers[i].sweepFollowerMessages(clusterSessionId);
        }
    }

    public ControlledFragmentHandler.Action onReplayExtensionMessage(int actingBlockLength, int templateId, int schemaId, int actingVersion, DirectBuffer buffer, int offset, int length, Header header) {
        if (null != this.consensusModuleExtension) {
            return this.consensusModuleExtension.onLogExtensionMessage(actingBlockLength, templateId, schemaId, actingVersion, buffer, offset, length, header);
        }
        throw new ClusterException("expected schemaId=111, actual=" + schemaId);
    }

    void onReplayTimerEvent(long correlationId) {
        if (!this.timerService.cancelTimerByCorrelationId(correlationId)) {
            this.expiredTimerCountByCorrelationIdMap.getAndIncrement(correlationId);
        }
    }

    void onReplaySessionOpen(long logPosition, long correlationId, long clusterSessionId, long timestamp, int responseStreamId, String responseChannel) {
        ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, this.refineResponseChannel(responseChannel));
        session.open(logPosition);
        session.lastActivityNs(this.clusterTimeUnit.toNanos(timestamp), correlationId);
        this.addSession(session);
        if (clusterSessionId >= this.nextSessionId) {
            this.nextSessionId = clusterSessionId + 1L;
        }
        if (null != this.consensusModuleExtension) {
            this.consensusModuleExtension.onSessionOpened(clusterSessionId);
        }
    }

    void onReplaySessionClose(long clusterSessionId, CloseReason closeReason) {
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (null != session) {
            session.closing(closeReason);
            this.closeSession(session);
        }
    }

    void onReplayClusterAction(long leadershipTermId, long logPosition, long timestamp, ClusterAction action, int flags) {
        if (leadershipTermId == this.leadershipTermId) {
            if (ClusterAction.SUSPEND == action) {
                this.state(ConsensusModule.State.SUSPENDED);
            } else if (ClusterAction.RESUME == action) {
                this.state(ConsensusModule.State.ACTIVE);
            } else if (ClusterAction.SNAPSHOT == action && 0 == flags) {
                this.state(ConsensusModule.State.SNAPSHOT);
                this.totalSnapshotDurationTracker.onSnapshotBegin(this.clusterClock.timeNanos());
                if (0 == this.serviceCount) {
                    this.snapshotOnServiceAck(logPosition, timestamp, ServiceAck.EMPTY_SERVICE_ACKS);
                }
            }
        }
    }

    void onReplayNewLeadershipTermEvent(long leadershipTermId, long logPosition, long timestamp, long termBaseLogPosition, TimeUnit timeUnit, int appVersion) {
        ConsensusModuleAgent.logOnReplayNewLeadershipTermEvent(this.memberId, null != this.election, leadershipTermId, logPosition, timestamp, termBaseLogPosition, timeUnit, appVersion);
        if (timeUnit != this.clusterTimeUnit) {
            this.ctx.countedErrorHandler().onError(new ClusterException("incompatible timestamp units: " + this.clusterTimeUnit + " log=" + timeUnit, AeronException.Category.FATAL));
            this.unexpectedTermination();
        }
        if (!this.ctx.appVersionValidator().isVersionCompatible(this.ctx.appVersion(), appVersion)) {
            this.ctx.countedErrorHandler().onError(new ClusterException("incompatible version: " + SemanticVersion.toString(this.ctx.appVersion()) + " log=" + SemanticVersion.toString(appVersion), AeronException.Category.FATAL));
            this.unexpectedTermination();
        }
        this.leadershipTermId(leadershipTermId);
        if (null != this.election) {
            this.election.onReplayNewLeadershipTermEvent(leadershipTermId, logPosition, timestamp, termBaseLogPosition);
        }
        if (null != this.consensusModuleExtension) {
            this.consensusModuleExtension.onNewLeadershipTerm(new ConsensusControlState(null, this.logRecordingId, leadershipTermId, null));
        }
    }

    int addLogPublication(long appendPosition) {
        RecordingLog.Log clusterLog;
        long logPublicationTag = this.aeron.nextCorrelationId();
        this.logPublicationChannelTag = this.aeron.nextCorrelationId();
        ChannelUri channelUri = ChannelUri.parse(this.ctx.logChannel());
        channelUri.put("alias", "log");
        channelUri.put("tags", this.logPublicationChannelTag + "," + logPublicationTag);
        if (channelUri.isUdp()) {
            if (this.ctx.isLogMdc()) {
                channelUri.put("control-mode", "manual");
            }
            channelUri.put("ssc", Boolean.toString(this.activeMembers.length == 1));
        }
        if (null != (clusterLog = this.recoveryPlan.log)) {
            channelUri.initialPosition(appendPosition, clusterLog.initialTermId, clusterLog.termBufferLength);
            channelUri.put("mtu", Integer.toString(clusterLog.mtuLength));
        } else {
            this.ensureConsistentInitialTermId(channelUri);
        }
        String channel = channelUri.toString();
        ExclusivePublication publication = this.aeron.addExclusivePublication(channel, this.ctx.logStreamId());
        this.logPublisher.publication(publication);
        if (this.ctx.isLogMdc()) {
            for (ClusterMember member : this.activeMembers) {
                if (member.id() == this.memberId) continue;
                this.logPublisher.addDestination(member.logEndpoint());
            }
        }
        return publication.sessionId();
    }

    void joinLogAsLeader(long leadershipTermId, long logPosition, int logSessionId, boolean isStartup) {
        boolean isIpc = this.ctx.logChannel().startsWith("aeron:ipc");
        String channel = (isIpc ? "aeron:ipc" : "aeron:udp") + "?tags=" + this.logPublicationChannelTag + "|session-id=" + logSessionId + "|alias=log";
        this.leadershipTermId(leadershipTermId);
        this.startLogRecording(channel, this.ctx.logStreamId(), SourceLocation.LOCAL);
        while (!this.tryCreateAppendPosition(logSessionId)) {
            this.idle();
        }
        this.localLogChannel = isIpc ? channel : "aeron-spy:" + channel;
        this.awaitServicesReady(this.localLogChannel, this.ctx.logStreamId(), logSessionId, logPosition, Long.MAX_VALUE, isStartup, Cluster.Role.LEADER);
    }

    void liveLogDestination(String liveLogDestination) {
        this.liveLogDestination = liveLogDestination;
    }

    String liveLogDestination() {
        return this.liveLogDestination;
    }

    void catchupLogDestination(String catchupLogDestination) {
        this.catchupLogDestination = catchupLogDestination;
    }

    String catchupLogDestination() {
        return this.catchupLogDestination;
    }

    boolean tryJoinLogAsFollower(Image image, boolean isLeaderStartup, long nowNs) {
        Subscription logSubscription = image.subscription();
        if (-1L == this.logSubscriptionId) {
            this.startLogRecording(logSubscription.channel(), logSubscription.streamId(), SourceLocation.REMOTE);
        }
        if (this.tryCreateAppendPosition(image.sessionId())) {
            this.logAdapter.image(image);
            this.lastAppendPosition = image.joinPosition();
            this.timeOfLastAppendPositionUpdateNs = nowNs;
            this.awaitServicesReady(logSubscription.channel(), logSubscription.streamId(), image.sessionId(), image.joinPosition(), Long.MAX_VALUE, isLeaderStartup, Cluster.Role.FOLLOWER);
            return true;
        }
        return false;
    }

    void awaitServicesReady(String logChannel, int streamId, int logSessionId, long logPosition, long maxLogPosition, boolean isStartup, Cluster.Role role) {
        if (this.serviceCount > 0) {
            this.serviceProxy.joinLog(logPosition, maxLogPosition, this.memberId, logSessionId, streamId, isStartup, role, logChannel);
            this.expectedAckPosition = logPosition;
            while (!ServiceAck.hasReached(logPosition, this.serviceAckId, this.serviceAckQueues)) {
                this.idle(this.consensusModuleAdapter.poll());
            }
            ServiceAck.removeHead(this.serviceAckQueues);
            ++this.serviceAckId;
        }
    }

    LogReplay newLogReplay(long logPosition, long appendPosition) {
        return new LogReplay(this.archive, this.logRecordingId, logPosition, appendPosition, this.logAdapter, this.ctx);
    }

    int replayLogPoll(LogAdapter logAdapter, long stopPosition) {
        int workCount = 0;
        if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
            logAdapter.poll(stopPosition);
            long position = logAdapter.position();
            if (this.commitPosition.getWeak() < position) {
                this.commitPosition.setOrdered(position);
                ++workCount;
            } else if (logAdapter.isImageClosed() && position < stopPosition) {
                throw new ClusterEvent("unexpected image close when replaying log: position=" + position);
            }
        }
        return workCount += this.consensusModuleAdapter.poll();
    }

    long logRecordingId() {
        return this.logRecordingId;
    }

    void logRecordingId(long recordingId) {
        if (-1L != recordingId) {
            this.logRecordingId = recordingId;
        }
    }

    void truncateLogEntry(long leadershipTermId, long logPosition) {
        this.archive.stopAllReplays(this.logRecordingId);
        this.archive.truncateRecording(this.logRecordingId, logPosition);
        if (-1L != leadershipTermId) {
            this.recordingLog.commitLogPosition(leadershipTermId, logPosition);
        }
        this.logAdapter.disconnect(this.ctx.countedErrorHandler(), logPosition);
        this.logRecordingStopPosition = logPosition;
    }

    boolean appendNewLeadershipTermEvent(long nowNs) {
        return this.logPublisher.appendNewLeadershipTermEvent(this.leadershipTermId, this.clusterClock.timeUnit().convert(nowNs, TimeUnit.NANOSECONDS), this.election.logPosition(), this.memberId, this.logPublisher.sessionId(), this.clusterTimeUnit, this.ctx.appVersion());
    }

    void electionComplete(long nowNs) {
        long logPosition;
        this.leadershipTermId(this.election.leadershipTermId());
        if (Cluster.Role.LEADER == this.role) {
            this.timeOfLastLogUpdateNs = nowNs - this.leaderHeartbeatIntervalNs;
            this.timerService.currentTime(this.clusterClock.timeUnit().convert(nowNs, TimeUnit.NANOSECONDS));
            ClusterControl.ToggleState.activate(this.controlToggle);
            this.prepareSessionsForNewTerm(this.election.isLeaderStartup());
        } else {
            this.timeOfLastLogUpdateNs = nowNs;
            this.timeOfLastAppendPositionUpdateNs = nowNs;
            this.timeOfLastAppendPositionSendNs = nowNs;
            this.localLogChannel = null;
        }
        NodeControl.ToggleState.activate(this.nodeControlToggle);
        this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.serviceCount, this.logRecordingId);
        this.notifiedCommitPosition = logPosition = this.election.logPosition();
        this.commitPosition.setOrdered(logPosition);
        this.updateMemberDetails(this.election.leader());
        this.connectIngress();
        if (null != this.consensusModuleExtension) {
            this.consensusModuleExtension.onElectionComplete(new ConsensusControlState(this.logPublisher.publication(), this.logRecordingId, this.leadershipTermId, this.localLogChannel));
        }
        this.election = null;
    }

    void trackCatchupCompletion(ClusterMember follower, long leadershipTermId, short appendPositionFlags) {
        if ((-1L != follower.catchupReplaySessionId() || ConsensusModuleAgent.isCatchupAppendPosition(appendPositionFlags)) && follower.logPosition() >= this.logPublisher.position()) {
            if (-1L != follower.catchupReplayCorrelationId() && this.archive.archiveProxy().stopReplay(follower.catchupReplaySessionId(), this.aeron.nextCorrelationId(), this.archive.controlSessionId())) {
                follower.catchupReplayCorrelationId(-1L);
            }
            if (this.consensusPublisher.stopCatchup(follower.publication(), leadershipTermId, follower.id())) {
                follower.catchupReplaySessionId(-1L);
            }
        }
    }

    void catchupInitiated(long nowNs) {
        this.timeOfLastAppendPositionUpdateNs = nowNs;
        this.timeOfLastAppendPositionSendNs = nowNs;
    }

    int catchupPoll(long limitPosition, long nowNs) {
        int workCount = 0;
        if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
            int fragments = this.logAdapter.poll(Math.min(this.appendPosition.get(), limitPosition));
            workCount += fragments;
            if (fragments == 0 && this.logAdapter.image().isClosed()) {
                throw new ClusterEvent("unexpected image close during catchup: position=" + this.logAdapter.image().position());
            }
            ExclusivePublication publication = this.election.leader().publication();
            workCount += this.updateFollowerPosition(publication, nowNs, this.leadershipTermId, this.appendPosition.get(), (short)1);
            this.commitPosition.proposeMaxOrdered(this.logAdapter.position());
        }
        if (nowNs > this.timeOfLastAppendPositionUpdateNs + this.leaderHeartbeatTimeoutNs && ConsensusModule.State.ACTIVE == this.state) {
            throw new ClusterEvent("no catchup progress commitPosition=" + this.commitPosition.getWeak() + " limitPosition=" + limitPosition + " lastAppendPosition=" + this.lastAppendPosition + " appendPosition=" + (null != this.appendPosition ? this.appendPosition.get() : -1L) + " logPosition=" + this.election.logPosition());
        }
        return workCount += this.consensusModuleAdapter.poll();
    }

    boolean isCatchupNearLive(long position) {
        Image image = this.logAdapter.image();
        if (null != image) {
            long window;
            long localPosition = image.position();
            return localPosition >= position - (window = (long)Math.min(image.termBufferLength() >> 2, 0x2000000));
        }
        return false;
    }

    void stopAllCatchups() {
        for (ClusterMember member : this.activeMembers) {
            if (member.catchupReplaySessionId() == -1L) continue;
            if (member.catchupReplayCorrelationId() != -1L) {
                try {
                    this.archive.stopReplay(member.catchupReplaySessionId());
                }
                catch (Exception ex) {
                    this.ctx.countedErrorHandler().onError(new ClusterEvent("replay already stopped for catchup"));
                }
            }
            member.catchupReplaySessionId(-1L);
            member.catchupReplayCorrelationId(-1L);
        }
    }

    int pollArchiveEvents() {
        int workCount = 0;
        if (null != this.archive) {
            RecordingSignalPoller poller = this.recordingSignalPoller;
            workCount += poller.poll();
            if (poller.isPollComplete()) {
                int templateId = poller.templateId();
                if (1 == templateId && poller.code() == ControlResponseCode.ERROR) {
                    for (ClusterMember member : this.activeMembers) {
                        if (member.catchupReplayCorrelationId() != poller.correlationId()) continue;
                        member.catchupReplaySessionId(-1L);
                        member.catchupReplayCorrelationId(-1L);
                        String message = "catchup replay failed - " + poller.errorMessage();
                        this.ctx.countedErrorHandler().onError(new ClusterEvent(message));
                        return workCount;
                    }
                    if (6L == poller.relevantId()) {
                        String message = "replay no longer relevant - " + poller.errorMessage();
                        this.ctx.countedErrorHandler().onError(new ClusterEvent(message));
                        return workCount;
                    }
                    ArchiveException ex = new ArchiveException(poller.errorMessage(), (int)poller.relevantId(), poller.correlationId());
                    if (ex.errorCode() == 11) {
                        this.ctx.countedErrorHandler().onError(ex);
                        this.unexpectedTermination();
                    }
                    if (null != this.election) {
                        this.election.handleError(this.clusterClock.timeNanos(), ex);
                    }
                } else if (24 == templateId) {
                    long recordingId = poller.recordingId();
                    long position = poller.recordingPosition();
                    RecordingSignal signal = poller.recordingSignal();
                    if (RecordingSignal.STOP == signal && recordingId == this.logRecordingId) {
                        this.logRecordingStopPosition = position;
                        if (null == this.election && ConsensusModule.State.ACTIVE == this.state) {
                            this.enterElection(this.logAdapter.isLogEndOfStreamAt(position), "log recording stopped");
                            return workCount;
                        }
                    }
                    if (null != this.election) {
                        this.election.onRecordingSignal(poller.correlationId(), recordingId, position, signal);
                    }
                }
            } else if (0 == workCount && !poller.subscription().isConnected()) {
                this.ctx.countedErrorHandler().onError(new ClusterEvent("local archive is not connected"));
                this.unexpectedTermination();
            }
        }
        return workCount;
    }

    void leadershipTermId(long leadershipTermId) {
        this.leadershipTermId = leadershipTermId;
        this.ctx.leadershipTermIdCounter().setOrdered(leadershipTermId);
        for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
            tracker.leadershipTermId(leadershipTermId);
        }
    }

    private static void logOnNewLeadershipTerm(int memberId, long logLeadershipTermId, long nextLeadershipTermId, long nextTermBaseLogPosition, long nextLogPosition, long leadershipTermId, long termBaseLogPosition, long logPosition, long leaderRecordingId, long timestamp, int leaderId, int logSessionId, int appVersion, boolean isStartup) {
    }

    private static void logOnCommitPosition(int memberId, long leadershipTermId, long logPosition, int leaderMemberId) {
    }

    private static void logOnAddPassiveMember(int memberId, long correlationId, String memberEndpoints) {
    }

    private static void logOnReplayNewLeadershipTermEvent(int memberId, boolean isInElection, long leadershipTermId, long logPosition, long timestamp, long termBaseLogPosition, TimeUnit timeUnit, int appVersion) {
    }

    private static void logOnRequestVote(int memberId, long logLeadershipTermId, long logPosition, long candidateTermId, int candidateId, int protocolVersion) {
    }

    private static void logOnAppendPosition(int memberId, long leadershipTermId, long logPosition, int followerMemberId, short flags) {
    }

    private static void logOnCanvassPosition(int memberId, long logLeadershipTermId, long logPosition, long leadershipTermId, int followerMemberId, int protocolVersion) {
    }

    private void logStandbySnapshotNotification(int memberId, long recordingId, long leadershipTermId, long termBaseLogPosition, long logPosition, long timestamp, TimeUnit timeUnit, int serviceId, String archiveEndpoint) {
    }

    private static void logOnStopCatchup(int memberId, long leadershipTermId, int followerMemberId) {
    }

    private static void logOnCatchupPosition(int memberId, long leadershipTermId, long logPosition, int followerMemberId, String catchupEndpoint) {
    }

    private static void logOnTerminationPosition(int memberId, long logLeadershipTermId, long logPosition) {
    }

    private static void logOnTerminationAck(int memberId, long logLeadershipTermId, long logPosition, int senderMemberId) {
    }

    private static void logOnServiceAck(int memberId, long logPosition, long timestamp, TimeUnit timeUnit, long ackId, long relevantId, int serviceId) {
    }

    private static void logNewElection(int memberId, long logLeadershipTermId, long logPosition, long appendedPosition, String reason) {
    }

    static void logReplicationEnded(int memberId, String purpose, String controlUri, long srcRecordingId, long dstRecordingId, long position, boolean hasSynced) {
    }

    private void startLogRecording(String channel, int streamId, SourceLocation sourceLocation) {
        try {
            long logRecordingId = this.recordingLog.findLastTermRecordingId();
            this.logSubscriptionId = -1L == logRecordingId ? this.archive.startRecording(channel, streamId, sourceLocation, true) : this.archive.extendRecording(logRecordingId, channel, streamId, sourceLocation, true);
        }
        catch (ArchiveException ex) {
            if (ex.errorCode() == 11) {
                this.ctx.countedErrorHandler().onError(ex);
                this.unexpectedTermination();
            }
            throw ex;
        }
    }

    private void prepareSessionsForNewTerm(boolean isStartup) {
        if (isStartup) {
            int size = this.sessions.size();
            for (int i = 0; i < size; ++i) {
                ClusterSession session = this.sessions.get(i);
                if (session.state() != ClusterSession.State.OPEN) continue;
                session.closing(CloseReason.TIMEOUT);
            }
        } else {
            int size = this.sessions.size();
            for (int i = 0; i < size; ++i) {
                ClusterSession session = this.sessions.get(i);
                if (session.state() != ClusterSession.State.OPEN) continue;
                session.connect(this.ctx.countedErrorHandler(), this.aeron);
            }
            long nowNs = this.clusterClock.timeNanos();
            int size2 = this.sessions.size();
            for (int i = 0; i < size2; ++i) {
                ClusterSession session = this.sessions.get(i);
                if (session.state() != ClusterSession.State.OPEN) continue;
                session.timeOfLastActivityNs(nowNs);
                session.hasNewLeaderEventPending(true);
            }
        }
    }

    private void updateMemberDetails(ClusterMember newLeader) {
        this.leaderMember = newLeader;
        for (ClusterMember clusterMember : this.activeMembers) {
            clusterMember.isLeader(clusterMember.id() == this.leaderMember.id());
        }
        this.ingressEndpoints = ClusterMember.ingressEndpoints(this.activeMembers);
    }

    private int slowTickWork(long nowNs) {
        int workCount = this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException("unexpected Aeron close");
        }
        if (ConsensusModule.State.CLOSED == this.state) {
            this.unexpectedTermination();
        }
        if (nowNs >= this.markFileUpdateDeadlineNs) {
            this.markFileUpdateDeadlineNs = nowNs + ClusteredServiceContainer.Configuration.MARK_FILE_UPDATE_INTERVAL_NS;
            this.markFile.updateActivityTimestamp(this.clusterClock.timeMillis());
        }
        workCount += this.pollArchiveEvents();
        workCount += this.sendRedirects(this.redirectUserSessions, nowNs);
        workCount += this.sendRejections(this.rejectedUserSessions, nowNs);
        workCount += this.sendRejections(this.rejectedBackupSessions, nowNs);
        if (null == this.election) {
            if (Cluster.Role.LEADER == this.role) {
                workCount += this.checkClusterControlToggle(nowNs);
                if (ConsensusModule.State.ACTIVE == this.state) {
                    workCount += this.processPendingSessions(this.pendingUserSessions, this.rejectedUserSessions, nowNs);
                    workCount += this.processPendingSessions(this.pendingBackupSessions, this.rejectedBackupSessions, nowNs);
                    workCount += this.checkSessions(this.sessions, nowNs);
                    if (!ClusterMember.hasActiveQuorum(this.activeMembers, nowNs, this.leaderHeartbeatTimeoutNs)) {
                        this.enterElection(false, "inactive follower quorum");
                        ++workCount;
                    }
                } else if (ConsensusModule.State.TERMINATING == this.state && this.clusterTermination.canTerminate(this.activeMembers, nowNs)) {
                    this.recordingLog.commitLogPosition(this.leadershipTermId, this.terminationPosition);
                    this.closeAndTerminate();
                }
            } else {
                if (Cluster.Role.FOLLOWER == this.role && ConsensusModule.State.ACTIVE == this.state) {
                    workCount += this.processPendingSessions(this.pendingBackupSessions, this.rejectedBackupSessions, nowNs);
                }
                if ((ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) && nowNs >= this.timeOfLastLogUpdateNs + this.leaderHeartbeatTimeoutNs && -1L == this.terminationPosition) {
                    this.enterElection(false, "leader heartbeat timeout");
                    ++workCount;
                }
            }
            if (ConsensusModule.State.ACTIVE == this.state) {
                workCount += this.checkNodeControlToggle();
            }
        }
        return workCount;
    }

    private int consensusWork(long timestamp, long nowNs) {
        int workCount = 0;
        if (Cluster.Role.LEADER == this.role) {
            if (ConsensusModule.State.ACTIVE == this.state) {
                workCount += this.timerService.poll(timestamp);
                for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
                    workCount += tracker.poll();
                }
                workCount += this.ingressAdapter.poll();
            }
            workCount += this.updateLeaderPosition(nowNs);
        } else {
            if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
                if (-1L != this.terminationPosition && this.logAdapter.position() >= this.terminationPosition) {
                    this.state(ConsensusModule.State.TERMINATING);
                    if (this.serviceCount > 0) {
                        this.serviceProxy.terminationPosition(this.terminationPosition, this.ctx.countedErrorHandler());
                    } else {
                        this.terminateOnServiceAck(this.logAdapter.position());
                    }
                } else {
                    long limit = null != this.appendPosition ? this.appendPosition.get() : this.logRecordingStopPosition;
                    int count = this.logAdapter.poll(Math.min(this.notifiedCommitPosition, limit));
                    if (0 == count && this.logAdapter.isImageClosed()) {
                        boolean isEos = this.logAdapter.isLogEndOfStream();
                        this.enterElection(isEos, "log disconnected from leader: eos=" + isEos);
                        return 1;
                    }
                    this.commitPosition.proposeMaxOrdered(this.logAdapter.position());
                    workCount += this.ingressAdapter.poll();
                    workCount += count;
                }
            }
            workCount += this.updateFollowerPosition(nowNs);
        }
        workCount += this.consensusModuleAdapter.poll();
        workCount += this.pollStandbySnapshotReplication(nowNs);
        if (null != this.consensusModuleExtension) {
            workCount += this.consensusModuleExtension.doWork(nowNs);
        }
        return workCount;
    }

    private int checkClusterControlToggle(long nowNs) {
        if (ConsensusModule.State.ACTIVE == this.state) {
            switch (ClusterControl.ToggleState.get(this.controlToggle)) {
                case SUSPEND: {
                    long timestamp = this.clusterClock.time();
                    if (!this.appendAction(ClusterAction.SUSPEND, timestamp, 0)) break;
                    this.state(ConsensusModule.State.SUSPENDED);
                    break;
                }
                case SNAPSHOT: {
                    long timestamp = this.clusterClock.time();
                    if (!this.appendAction(ClusterAction.SNAPSHOT, timestamp, 0)) break;
                    this.state(ConsensusModule.State.SNAPSHOT);
                    this.totalSnapshotDurationTracker.onSnapshotBegin(nowNs);
                    if (0 != this.serviceCount) break;
                    this.snapshotOnServiceAck(this.logPublisher.position(), timestamp, ServiceAck.EMPTY_SERVICE_ACKS);
                    break;
                }
                case STANDBY_SNAPSHOT: {
                    long timestamp = this.clusterClock.time();
                    if (!this.appendAction(ClusterAction.SNAPSHOT, timestamp, 1)) break;
                    ClusterControl.ToggleState.reset(this.controlToggle);
                    break;
                }
                case SHUTDOWN: {
                    long timestamp = this.clusterClock.time();
                    if (!this.appendAction(ClusterAction.SNAPSHOT, timestamp, 0)) break;
                    long position = this.logPublisher.position();
                    this.clusterTermination = new ClusterTermination(nowNs + this.ctx.terminationTimeoutNs(), this.serviceCount);
                    this.clusterTermination.terminationPosition(this.ctx.countedErrorHandler(), this.consensusPublisher, this.activeMembers, this.thisMember, this.leadershipTermId, position);
                    this.terminationPosition = position;
                    this.state(ConsensusModule.State.SNAPSHOT);
                    this.totalSnapshotDurationTracker.onSnapshotBegin(nowNs);
                    if (0 != this.serviceCount) break;
                    this.snapshotOnServiceAck(position, timestamp, ServiceAck.EMPTY_SERVICE_ACKS);
                    break;
                }
                case ABORT: {
                    CountedErrorHandler errorHandler = this.ctx.countedErrorHandler();
                    long position = this.logPublisher.position();
                    this.clusterTermination = new ClusterTermination(nowNs + this.ctx.terminationTimeoutNs(), this.serviceCount);
                    this.clusterTermination.terminationPosition(errorHandler, this.consensusPublisher, this.activeMembers, this.thisMember, this.leadershipTermId, position);
                    this.terminationPosition = position;
                    this.serviceProxy.terminationPosition(this.terminationPosition, errorHandler);
                    this.state(ConsensusModule.State.TERMINATING);
                    break;
                }
                default: {
                    return 0;
                }
            }
            return 1;
        }
        if (ConsensusModule.State.SUSPENDED == this.state && ClusterControl.ToggleState.RESUME == ClusterControl.ToggleState.get(this.controlToggle)) {
            long timestamp = this.clusterClock.time();
            if (this.appendAction(ClusterAction.RESUME, timestamp, 0)) {
                this.state(ConsensusModule.State.ACTIVE);
                ClusterControl.ToggleState.reset(this.controlToggle);
            }
            return 1;
        }
        return 0;
    }

    private int checkNodeControlToggle() {
        if (NodeControl.ToggleState.REPLICATE_STANDBY_SNAPSHOT == NodeControl.ToggleState.get(this.nodeControlToggle)) {
            if (null == this.standbySnapshotReplicator) {
                this.standbySnapshotReplicator = StandbySnapshotReplicator.newInstance(this.ctx.clusterMemberId(), this.ctx.archiveContext(), this.recordingLog, this.serviceCount, this.ctx.leaderArchiveControlChannel(), this.ctx.archiveContext().controlRequestStreamId(), this.ctx.replicationChannel());
            }
            NodeControl.ToggleState.reset(this.nodeControlToggle);
            return 1;
        }
        return 0;
    }

    private boolean appendAction(ClusterAction action, long timestamp, int flags) {
        return this.logPublisher.appendClusterAction(this.leadershipTermId, timestamp, action, flags);
    }

    private int processPendingSessions(ArrayList<ClusterSession> pendingSessions, ArrayList<ClusterSession> rejectedSessions, long nowNs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = pendingSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = pendingSessions.get(i);
            if (session.state() == ClusterSession.State.INVALID) {
                ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                session.close(this.aeron, this.ctx.countedErrorHandler());
                continue;
            }
            if (nowNs > session.timeOfLastActivityNs() + this.sessionTimeoutNs && session.state() != ClusterSession.State.INIT) {
                ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                session.close(this.aeron, this.ctx.countedErrorHandler());
                this.ctx.timedOutClientCounter().incrementOrdered();
                continue;
            }
            if ((session.state() == ClusterSession.State.INIT || session.state() == ClusterSession.State.CONNECTING || session.state() == ClusterSession.State.CONNECTED) && session.isResponsePublicationConnected(this.aeron, nowNs)) {
                session.state(ClusterSession.State.CONNECTED);
                this.authenticator.onConnectedSession(this.sessionProxy.session(session), this.clusterClock.timeMillis());
            }
            if (session.state() == ClusterSession.State.CHALLENGED && session.isResponsePublicationConnected(this.aeron, nowNs)) {
                this.authenticator.onChallengedSession(this.sessionProxy.session(session), this.clusterClock.timeMillis());
            }
            if (session.state() == ClusterSession.State.AUTHENTICATED) {
                switch (session.action()) {
                    case CLIENT: {
                        if (!session.appendSessionToLogAndSendOpen(this.logPublisher, this.egressPublisher, this.leadershipTermId, this.memberId, nowNs, this.clusterClock.time())) break;
                        ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                        this.addSession(session);
                        ++workCount;
                        if (null == this.consensusModuleExtension) break;
                        this.consensusModuleExtension.onSessionOpened(session.id());
                        break;
                    }
                    case BACKUP: {
                        if (!this.authorisationService.isAuthorised(111, 77, null, session.encodedPrincipal())) {
                            session.reject(EventCode.AUTHENTICATION_REJECTED, "Not authorised for BackupQuery", this.ctx.errorLog(), ConsensusModule.Configuration.clusterMemberId());
                            break;
                        }
                        RecordingLog.Entry entry = this.recordingLog.findLastTerm();
                        if (null == entry || !this.consensusPublisher.backupResponse(session, this.commitPosition.id(), this.leaderMember.id(), this.thisMember.id(), entry, this.recoveryPlan, ClusterMember.encodeAsString(this.activeMembers))) break;
                        ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                        session.close(this.aeron, this.ctx.countedErrorHandler());
                        ++workCount;
                        break;
                    }
                    case HEARTBEAT: {
                        if (!this.authorisationService.isAuthorised(111, 79, null, session.encodedPrincipal())) {
                            session.reject(EventCode.AUTHENTICATION_REJECTED, "Not authorised for Heartbeat", this.ctx.errorLog(), ConsensusModule.Configuration.clusterMemberId());
                            break;
                        }
                        if (!this.consensusPublisher.heartbeatResponse(session)) break;
                        ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                        session.close(this.aeron, this.ctx.countedErrorHandler());
                        ++workCount;
                        break;
                    }
                    case STANDBY_SNAPSHOT: {
                        if (!this.authorisationService.isAuthorised(111, 81, null, session.encodedPrincipal())) {
                            session.reject(EventCode.AUTHENTICATION_REJECTED, "Not authorised for StandbySnapshot", this.ctx.errorLog(), ConsensusModule.Configuration.clusterMemberId());
                            break;
                        }
                        List standbySnapshotEntries = (List)session.requestInput();
                        for (StandbySnapshotEntry standbySnapshotEntry : standbySnapshotEntries) {
                            this.logStandbySnapshotNotification(this.memberId, standbySnapshotEntry.recordingId(), standbySnapshotEntry.leadershipTermId(), standbySnapshotEntry.termBaseLogPosition(), standbySnapshotEntry.logPosition(), standbySnapshotEntry.timestamp(), this.ctx.clusterClock().timeUnit(), standbySnapshotEntry.serviceId(), standbySnapshotEntry.archiveEndpoint());
                            this.recordingLog.appendStandbySnapshot(standbySnapshotEntry.recordingId(), standbySnapshotEntry.leadershipTermId(), standbySnapshotEntry.termBaseLogPosition(), standbySnapshotEntry.logPosition(), standbySnapshotEntry.timestamp(), standbySnapshotEntry.serviceId(), standbySnapshotEntry.archiveEndpoint());
                        }
                        this.ctx.standbySnapshotCounter().increment();
                        ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                        session.close(this.aeron, this.ctx.countedErrorHandler());
                        ++workCount;
                    }
                }
                continue;
            }
            if (session.state() != ClusterSession.State.REJECTED) continue;
            ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
            rejectedSessions.add(session);
        }
        return workCount;
    }

    private int sendRejections(ArrayList<ClusterSession> rejectedSessions, long nowNs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = rejectedSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = rejectedSessions.get(i);
            String detail = session.responseDetail();
            EventCode eventCode = session.eventCode();
            if (!(session.isResponsePublicationConnected(this.aeron, nowNs) && this.egressPublisher.sendEvent(session, this.leadershipTermId, this.leaderMember.id(), eventCode, detail) || session.state() != ClusterSession.State.INIT && nowNs > session.timeOfLastActivityNs() + this.sessionTimeoutNs) && session.state() != ClusterSession.State.INVALID) continue;
            ArrayListUtil.fastUnorderedRemove(rejectedSessions, i, lastIndex--);
            session.close(this.aeron, this.ctx.countedErrorHandler());
            ++workCount;
        }
        return workCount;
    }

    private int sendRedirects(ArrayList<ClusterSession> redirectSessions, long nowNs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = redirectSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = redirectSessions.get(i);
            EventCode eventCode = EventCode.REDIRECT;
            int leaderId = this.leaderMember.id();
            if (!(session.isResponsePublicationConnected(this.aeron, nowNs) && this.egressPublisher.sendEvent(session, this.leadershipTermId, leaderId, eventCode, this.ingressEndpoints) || session.state() != ClusterSession.State.INIT && nowNs > session.timeOfLastActivityNs() + this.sessionTimeoutNs) && session.state() != ClusterSession.State.INVALID) continue;
            ArrayListUtil.fastUnorderedRemove(redirectSessions, i, lastIndex--);
            session.close(this.aeron, this.ctx.countedErrorHandler());
            ++workCount;
        }
        return workCount;
    }

    private int checkSessions(ArrayList<ClusterSession> sessions, long nowNs) {
        int workCount = 0;
        for (int i = sessions.size() - 1; i >= 0; --i) {
            ClusterSession session = sessions.get(i);
            if (nowNs > session.timeOfLastActivityNs() + this.sessionTimeoutNs) {
                switch (session.state()) {
                    case OPEN: {
                        session.closing(CloseReason.TIMEOUT);
                        if (!this.logPublisher.appendSessionClose(this.memberId, session, this.leadershipTermId, this.clusterClock.time(), this.clusterClock.timeUnit())) break;
                        String msg = session.closeReason().name();
                        this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, msg);
                        session.closedLogPosition(this.logPublisher.position());
                        this.uncommittedClosedSessions.addLast(session);
                        this.ctx.timedOutClientCounter().incrementOrdered();
                        this.closeSession(session);
                        break;
                    }
                    case CLOSING: {
                        if (!this.logPublisher.appendSessionClose(this.memberId, session, this.leadershipTermId, this.clusterClock.time(), this.clusterClock.timeUnit())) break;
                        String msg = session.closeReason().name();
                        this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, msg);
                        session.closedLogPosition(this.logPublisher.position());
                        this.uncommittedClosedSessions.addLast(session);
                        if (session.closeReason() == CloseReason.TIMEOUT) {
                            this.ctx.timedOutClientCounter().incrementOrdered();
                        }
                        this.closeSession(session);
                        break;
                    }
                    default: {
                        this.closeSession(session);
                    }
                }
                ++workCount;
                continue;
            }
            if (session.hasOpenEventPending()) {
                workCount += session.sendSessionOpenEvent(this.egressPublisher, this.leadershipTermId, this.memberId);
                continue;
            }
            if (!session.hasNewLeaderEventPending()) continue;
            workCount += this.sendNewLeaderEvent(session);
        }
        return workCount;
    }

    private void captureServiceAck(long logPosition, long ackId, long relevantId, int serviceId) {
        if (0L == ackId && -1L != this.serviceClientIds[serviceId]) {
            throw new ClusterException("initial ack already received from service: possible duplicate serviceId=" + serviceId);
        }
        this.serviceAckQueues[serviceId].offerLast(new ServiceAck(ackId, logPosition, relevantId));
    }

    private ServiceAck[] pollServiceAcks(long logPosition, int serviceId) {
        ServiceAck[] serviceAcks = new ServiceAck[this.serviceAckQueues.length];
        int length = this.serviceAckQueues.length;
        for (int id = 0; id < length; ++id) {
            ServiceAck serviceAck = this.serviceAckQueues[id].pollFirst();
            if (null == serviceAck || serviceAck.logPosition() != logPosition) {
                throw new ClusterException("invalid ack for serviceId=" + serviceId + " logPosition=" + logPosition + " " + serviceAck);
            }
            serviceAcks[id] = serviceAck;
        }
        return serviceAcks;
    }

    private int sendNewLeaderEvent(ClusterSession session) {
        if (this.egressPublisher.newLeader(session, this.leadershipTermId, this.leaderMember.id(), this.ingressEndpoints)) {
            session.hasNewLeaderEventPending(false);
            return 1;
        }
        return 0;
    }

    private boolean tryCreateAppendPosition(int logSessionId) {
        CountersReader counters = this.aeron.countersReader();
        int counterId = RecordingPos.findCounterIdBySession(counters, logSessionId, this.archive.archiveId());
        if (-1 == counterId) {
            return false;
        }
        long registrationId = counters.getCounterRegistrationId(counterId);
        if (0L == registrationId) {
            return false;
        }
        long recordingId = RecordingPos.getRecordingId(counters, counterId);
        if (-1L == recordingId) {
            return false;
        }
        this.logRecordingId(recordingId);
        this.appendPosition = new ReadableCounter(counters, registrationId, counterId);
        return true;
    }

    private int updateFollowerPosition(long nowNs) {
        long recordedPosition = null != this.appendPosition ? this.appendPosition.get() : this.logRecordingStopPosition;
        return this.updateFollowerPosition(this.leaderMember.publication(), nowNs, this.leadershipTermId, recordedPosition, (short)0);
    }

    private int updateFollowerPosition(ExclusivePublication publication, long nowNs, long leadershipTermId, long appendPosition, short flags) {
        long position = Math.max(appendPosition, this.lastAppendPosition);
        if ((position > this.lastAppendPosition || nowNs >= this.timeOfLastAppendPositionSendNs + this.leaderHeartbeatIntervalNs) && this.consensusPublisher.appendPosition(publication, leadershipTermId, position, this.memberId, flags)) {
            if (position > this.lastAppendPosition) {
                this.lastAppendPosition = position;
                this.timeOfLastAppendPositionUpdateNs = nowNs;
            }
            this.timeOfLastAppendPositionSendNs = nowNs;
            return 1;
        }
        return 0;
    }

    private void loadSnapshot(RecordingLog.Snapshot snapshot, AeronArchive archive) {
        String channel = this.ctx.replayChannel();
        int streamId = this.ctx.replayStreamId();
        int sessionId = (int)archive.startReplay(snapshot.recordingId, 0L, -1L, channel, streamId);
        String replayChannel = ChannelUri.addSessionId(channel, sessionId);
        try (Subscription subscription = this.aeron.addSubscription(replayChannel, streamId);){
            Image image = this.awaitImage(sessionId, subscription);
            ConsensusModuleSnapshotAdapter adapter = new ConsensusModuleSnapshotAdapter(image, this);
            while (true) {
                int fragments;
                if (0 == (fragments = adapter.poll())) {
                    if (adapter.isDone()) break;
                    if (image.isClosed()) {
                        this.pollArchiveEvents();
                        throw new ClusterException("snapshot ended unexpectedly: " + image);
                    }
                }
                this.idle(fragments);
            }
            for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
                tracker.verify();
                tracker.reset();
            }
            if (null != this.consensusModuleExtension) {
                this.consensusModuleExtension.onStart(this, image);
            }
        }
        this.timerService.currentTime(this.clusterClock.time());
        this.commitPosition.setOrdered(snapshot.logPosition);
        this.leadershipTermId(snapshot.leadershipTermId);
        this.expectedAckPosition = snapshot.logPosition;
    }

    private Image awaitImage(int sessionId, Subscription subscription) {
        Image image;
        this.idleStrategy.reset();
        while ((image = subscription.imageBySessionId(sessionId)) == null) {
            this.idle();
        }
        return image;
    }

    private Counter addRecoveryStateCounter(RecordingLog.RecoveryPlan plan) {
        int snapshotsCount = plan.snapshots.size();
        if (snapshotsCount > 0) {
            long[] serviceSnapshotRecordingIds = new long[snapshotsCount - 1];
            RecordingLog.Snapshot snapshot = plan.snapshots.get(0);
            for (int i = 1; i < snapshotsCount; ++i) {
                RecordingLog.Snapshot serviceSnapshot = plan.snapshots.get(i);
                serviceSnapshotRecordingIds[serviceSnapshot.serviceId] = serviceSnapshot.recordingId;
            }
            return RecoveryState.allocate(this.aeron, snapshot.leadershipTermId, snapshot.logPosition, snapshot.timestamp, this.ctx.clusterId(), serviceSnapshotRecordingIds);
        }
        return RecoveryState.allocate(this.aeron, this.leadershipTermId, 0L, 0L, this.ctx.clusterId(), new long[0]);
    }

    private void captureServiceClientIds() {
        int length = this.serviceClientIds.length;
        for (int i = 0; i < length; ++i) {
            ServiceAck serviceAck = this.serviceAckQueues[i].pollFirst();
            this.serviceClientIds[i] = Objects.requireNonNull(serviceAck).relevantId();
        }
    }

    private int updateLeaderPosition(long nowNs) {
        if (null != this.appendPosition) {
            return this.updateLeaderPosition(nowNs, this.appendPosition.get());
        }
        return 0;
    }

    long quorumPosition() {
        return ClusterMember.quorumPosition(this.activeMembers, this.rankedPositions);
    }

    long timeOfLastLeaderUpdateNs() {
        return this.timeOfLastLeaderUpdateNs;
    }

    int updateLeaderPosition(long nowNs, long position) {
        this.thisMember.logPosition(position).timeOfLastAppendPositionNs(nowNs);
        long commitPosition = Math.min(this.quorumPosition(), position);
        if (commitPosition > this.commitPosition.getWeak() || nowNs >= this.timeOfLastLogUpdateNs + this.leaderHeartbeatIntervalNs) {
            this.publishCommitPosition(commitPosition);
            this.commitPosition.setOrdered(commitPosition);
            this.timeOfLastLogUpdateNs = nowNs;
            this.sweepUncommittedEntriesTo(commitPosition);
            return 1;
        }
        return 0;
    }

    void publishCommitPosition(long commitPosition) {
        for (ClusterMember member : this.activeMembers) {
            if (member.id() == this.memberId) continue;
            this.consensusPublisher.commitPosition(member.publication(), this.leadershipTermId, commitPosition, this.memberId);
        }
    }

    RecordingReplication newLogReplication(String leaderArchiveEndpoint, String responseArchiveEndpoint, long leaderRecordingId, long stopPosition, long nowNs) {
        String replicationChannel = this.ctx.replicationChannel();
        ReplicationParams replicationParams = new ReplicationParams().dstRecordingId(this.logRecordingId).stopPosition(stopPosition).replicationSessionId((int)this.aeron.nextCorrelationId());
        if (null != responseArchiveEndpoint) {
            ChannelUri channelUri = ChannelUri.parse(replicationChannel);
            channelUri.remove("endpoint");
            channelUri.put("control", responseArchiveEndpoint);
            channelUri.put("control-mode", "response");
            replicationChannel = channelUri.toString();
            replicationParams.srcResponseChannel(replicationChannel);
        }
        replicationParams.replicationChannel(replicationChannel);
        return new RecordingReplication(this.archive, leaderRecordingId, ChannelUri.createDestinationUri(this.ctx.leaderArchiveControlChannel(), leaderArchiveEndpoint), this.archive.context().controlRequestStreamId(), replicationParams, this.ctx.leaderHeartbeatTimeoutNs(), this.ctx.leaderHeartbeatIntervalNs(), nowNs);
    }

    void awaitLocalSocketsClosed(long registrationId) {
        CountersReader countersReader = this.aeron.countersReader();
        while (LocalSocketAddressStatus.findNumberOfAddressesByRegistrationId(countersReader, registrationId) > 0) {
            this.idle();
        }
    }

    private void clearSessionsAfter(long logPosition) {
        for (int i = this.sessions.size() - 1; i >= 0; --i) {
            ClusterSession session = this.sessions.get(i);
            if (session.openedLogPosition() <= logPosition) continue;
            this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, "election");
            this.closeSession(session);
        }
        for (ClusterSession session : this.pendingUserSessions) {
            this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, "election");
            session.close(this.aeron, this.ctx.countedErrorHandler());
        }
        this.pendingUserSessions.clear();
    }

    private void sweepUncommittedEntriesTo(long commitPosition) {
        ClusterSession clusterSession;
        for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
            tracker.sweepLeaderMessages();
        }
        while (this.uncommittedTimers.peekLong() <= commitPosition) {
            this.uncommittedTimers.pollLong();
            this.uncommittedTimers.pollLong();
        }
        while (null != (clusterSession = this.uncommittedClosedSessions.peekFirst()) && clusterSession.closedLogPosition() <= commitPosition) {
            this.uncommittedClosedSessions.pollFirst();
        }
    }

    private void restoreUncommittedEntries(long commitPosition) {
        ClusterSession session;
        PendingServiceMessageTracker[] i = this.uncommittedTimers.iterator();
        while (i.hasNext()) {
            long appendPosition = i.nextValue();
            long correlationId = i.nextValue();
            if (appendPosition <= commitPosition) continue;
            this.timerService.scheduleTimerForCorrelationId(correlationId, 0L);
        }
        this.uncommittedTimers.clear();
        for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
            tracker.restoreUncommittedMessages();
        }
        while (null != (session = this.uncommittedClosedSessions.pollFirst())) {
            if (session.closedLogPosition() <= commitPosition) continue;
            session.closedLogPosition(-1L);
            session.state(ClusterSession.State.CLOSING);
            this.addSession(session);
        }
    }

    private void enterElection(boolean isLogEndOfStream, String reason) {
        if (null != this.election) {
            throw new IllegalStateException("election in progress");
        }
        this.role(Cluster.Role.FOLLOWER);
        long leadershipTermId = this.leadershipTermId;
        RecordingLog.Entry termEntry = this.recordingLog.findTermEntry(leadershipTermId);
        long termBaseLogPosition = null != termEntry ? termEntry.termBaseLogPosition : this.recoveryPlan.lastTermBaseLogPosition;
        long appendedPosition = null != this.appendPosition ? this.appendPosition.get() : Math.max(this.recoveryPlan.appendedLogPosition, this.logRecordingStopPosition);
        long commitPosition = this.commitPosition.getWeak();
        ConsensusModuleAgent.logNewElection(this.memberId, leadershipTermId, commitPosition, appendedPosition, reason);
        this.ctx.countedErrorHandler().onError(new ClusterEvent(reason));
        this.election = new Election(false, isLogEndOfStream ? this.leaderMember.id() : -1, leadershipTermId, termBaseLogPosition, commitPosition, appendedPosition, this.activeMembers, this.clusterMemberByIdMap, this.thisMember, this.consensusPublisher, this.ctx, this);
        this.election.doWork(this.clusterClock.timeNanos());
    }

    private static void checkInterruptStatus() {
        if (Thread.currentThread().isInterrupted()) {
            throw new AgentTerminationException("interrupted");
        }
    }

    private void snapshotOnServiceAck(long logPosition, long timestamp, ServiceAck[] serviceAcks) {
        if (this.isSnapshotSetComplete(serviceAcks)) {
            this.takeSnapshot(timestamp, logPosition, serviceAcks);
        }
        long nowNs = this.clusterClock.timeNanos();
        int size = this.sessions.size();
        for (int i = 0; i < size; ++i) {
            this.sessions.get(i).timeOfLastActivityNs(nowNs);
        }
        if (null != this.clusterTermination) {
            this.serviceProxy.terminationPosition(this.terminationPosition, this.ctx.countedErrorHandler());
            this.clusterTermination.deadlineNs(this.clusterClock.timeNanos() + this.ctx.terminationTimeoutNs());
            this.state(ConsensusModule.State.TERMINATING);
        } else {
            this.state(ConsensusModule.State.ACTIVE);
            if (Cluster.Role.LEADER == this.role) {
                ClusterControl.ToggleState.reset(this.controlToggle);
            }
        }
    }

    private boolean isSnapshotSetComplete(ServiceAck[] serviceAcks) {
        boolean isSetComplete = true;
        for (int serviceId = serviceAcks.length - 1; serviceId >= 0; --serviceId) {
            long snapshotId = serviceAcks[serviceId].relevantId();
            if (-1L != snapshotId) continue;
            this.ctx.errorLog().record(new ClusterEvent("service=" + serviceId + " failed to take snapshot"));
            isSetComplete = false;
        }
        return isSetComplete;
    }

    private void takeSnapshot(long timestamp, long logPosition, ServiceAck[] serviceAcks) {
        long recordingId;
        try (ExclusivePublication publication = this.aeron.addExclusivePublication(this.ctx.snapshotChannel(), this.ctx.snapshotStreamId());){
            String channel = ChannelUri.addSessionId(this.ctx.snapshotChannel(), publication.sessionId());
            this.archive.startRecording(channel, this.ctx.snapshotStreamId(), SourceLocation.LOCAL, true);
            CountersReader counters = this.aeron.countersReader();
            int counterId = this.awaitRecordingCounter(counters, publication.sessionId(), this.archive.archiveId());
            recordingId = RecordingPos.getRecordingId(counters, counterId);
            this.snapshotState(publication, logPosition, this.leadershipTermId);
            if (null != this.consensusModuleExtension) {
                this.consensusModuleExtension.onTakeSnapshot(publication);
            }
            this.awaitRecordingComplete(recordingId, publication.position(), counters, counterId);
        }
        catch (ArchiveException ex) {
            if (ex.errorCode() == 11) {
                this.ctx.countedErrorHandler().onError(ex);
                this.unexpectedTermination();
            }
            throw ex;
        }
        long termBaseLogPosition = this.recordingLog.getTermEntry((long)this.leadershipTermId).termBaseLogPosition;
        for (int serviceId = serviceAcks.length - 1; serviceId >= 0; --serviceId) {
            long snapshotId = serviceAcks[serviceId].relevantId();
            this.recordingLog.appendSnapshot(snapshotId, this.leadershipTermId, termBaseLogPosition, logPosition, timestamp, serviceId);
        }
        this.recordingLog.appendSnapshot(recordingId, this.leadershipTermId, termBaseLogPosition, logPosition, timestamp, -1);
        this.recordingLog.force(this.ctx.fileSyncLevel());
        this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.serviceCount, -1L);
        this.ctx.snapshotCounter().incrementOrdered();
        this.totalSnapshotDurationTracker.onSnapshotEnd(this.clusterClock.timeNanos());
    }

    private void awaitRecordingComplete(long recordingId, long position, CountersReader counters, int counterId) {
        this.idleStrategy.reset();
        while (counters.getCounterValue(counterId) < position) {
            this.idle();
            if (RecordingPos.isActive(counters, counterId, recordingId)) continue;
            throw new ClusterException("recording has stopped unexpectedly: " + recordingId);
        }
    }

    private int awaitRecordingCounter(CountersReader counters, int sessionId, long archiveId) {
        this.idleStrategy.reset();
        int counterId = RecordingPos.findCounterIdBySession(counters, sessionId, archiveId);
        while (-1 == counterId) {
            this.idle();
            counterId = RecordingPos.findCounterIdBySession(counters, sessionId, archiveId);
        }
        return counterId;
    }

    private void snapshotState(ExclusivePublication publication, long logPosition, long leadershipTermId) {
        ConsensusModuleSnapshotTaker snapshotTaker = new ConsensusModuleSnapshotTaker(publication, this.idleStrategy, this.aeronClientInvoker);
        snapshotTaker.markBegin(1L, logPosition, leadershipTermId, 0, this.clusterTimeUnit, this.ctx.appVersion());
        if (this.pendingServiceMessageTrackers.length > 0) {
            PendingServiceMessageTracker trackerOne = this.pendingServiceMessageTrackers[0];
            snapshotTaker.snapshotConsensusModuleState(this.nextSessionId, trackerOne.nextServiceSessionId(), trackerOne.logServiceSessionId(), trackerOne.size());
        } else {
            snapshotTaker.snapshotConsensusModuleState(this.nextSessionId, 0L, 0L, 0);
        }
        int size = this.sessions.size();
        for (int i = 0; i < size; ++i) {
            ClusterSession session = this.sessions.get(i);
            ClusterSession.State sessionState = session.state();
            if (sessionState != ClusterSession.State.OPEN && sessionState != ClusterSession.State.CLOSING) continue;
            snapshotTaker.snapshotSession(session);
        }
        this.timerService.snapshot(snapshotTaker);
        for (PendingServiceMessageTracker tracker : this.pendingServiceMessageTrackers) {
            snapshotTaker.snapshot(tracker, this.ctx.errorHandler());
        }
        snapshotTaker.markEnd(1L, logPosition, leadershipTermId, 0, this.clusterTimeUnit, this.ctx.appVersion());
    }

    private void onUnavailableIngressImage(Image image) {
        this.ingressAdapter.freeSessionBuffer(image.sessionId());
    }

    private void onUnavailableCounter(CountersReader counters, long registrationId, int counterId) {
        if (ConsensusModule.State.TERMINATING != this.state && ConsensusModule.State.QUITTING != this.state) {
            for (long clientId : this.serviceClientIds) {
                if (registrationId != clientId) continue;
                this.ctx.countedErrorHandler().onError(new ClusterEvent("Aeron client in service closed unexpectedly"));
                this.state(ConsensusModule.State.CLOSED);
                return;
            }
            if (null != this.appendPosition && this.appendPosition.registrationId() == registrationId) {
                this.appendPosition.close();
                this.appendPosition = null;
                this.logSubscriptionId = -1L;
            }
        }
    }

    private void closeAndTerminate() {
        this.tryStopLogRecording();
        this.state(ConsensusModule.State.CLOSED);
        throw new ClusterTerminationException(true);
    }

    private void unexpectedTermination() {
        this.aeron.removeUnavailableCounterHandler(this.unavailableCounterHandlerRegistrationId);
        if (this.serviceCount > 0) {
            this.serviceProxy.terminationPosition(0L, this.ctx.countedErrorHandler());
        }
        this.tryStopLogRecording();
        this.state(ConsensusModule.State.CLOSED);
        throw new ClusterTerminationException(false);
    }

    private void terminateOnServiceAck(long logPosition) {
        if (null == this.clusterTermination) {
            this.consensusPublisher.terminationAck(this.leaderMember.publication(), this.leadershipTermId, logPosition, this.memberId);
            this.recordingLog.commitLogPosition(this.leadershipTermId, logPosition);
            this.closeAndTerminate();
        } else {
            this.clusterTermination.onServicesTerminated();
            if (this.clusterTermination.canTerminate(this.activeMembers, this.clusterClock.timeNanos())) {
                this.recordingLog.commitLogPosition(this.leadershipTermId, logPosition);
                this.closeAndTerminate();
            }
        }
    }

    private void tryStopLogRecording() {
        this.appendPosition = null;
        if (-1L != this.logSubscriptionId && this.archive.archiveProxy().publication().isConnected()) {
            try {
                this.archive.tryStopRecording(this.logSubscriptionId);
            }
            catch (Exception ex) {
                this.ctx.countedErrorHandler().onError(new ClusterException(ex, AeronException.Category.WARN));
            }
            this.logSubscriptionId = -1L;
        } else if (-1L != this.logRecordingId && this.archive.archiveProxy().publication().isConnected()) {
            try {
                this.archive.tryStopRecordingByIdentity(this.logRecordingId);
            }
            catch (Exception ex) {
                this.ctx.countedErrorHandler().onError(new ClusterException(ex, AeronException.Category.WARN));
            }
        }
    }

    private long getLastAppendedPosition() {
        this.idleStrategy.reset();
        long appendPosition;
        while (-1L == (appendPosition = this.archive.getStopPosition(this.logRecordingId))) {
            this.idle();
        }
        return appendPosition;
    }

    private void connectIngress() {
        ChannelUri ingressUri = ChannelUri.parse(this.ctx.ingressChannel());
        if (!ingressUri.containsKey("endpoint")) {
            ingressUri.put("endpoint", this.thisMember.ingressEndpoint());
        }
        if (Cluster.Role.LEADER != this.role && UdpChannel.isMulticastDestinationAddress(ingressUri)) {
            return;
        }
        Subscription subscription = this.aeron.addSubscription(ingressUri.toString(), this.ctx.ingressStreamId(), null, this::onUnavailableIngressImage);
        Subscription ipcSubscription = null;
        if (Cluster.Role.LEADER == this.role && this.ctx.isIpcIngressAllowed()) {
            ipcSubscription = this.aeron.addSubscription("aeron:ipc", this.ctx.ingressStreamId(), null, this::onUnavailableIngressImage);
        }
        this.ingressAdapter.connect(subscription, ipcSubscription);
    }

    private void ensureConsistentInitialTermId(ChannelUri channelUri) {
        channelUri.put("init-term-id", "0");
        channelUri.put("term-id", "0");
        channelUri.put("term-offset", "0");
    }

    private void checkFollowerForConsensusPublication(int followerMemberId) {
        ClusterMember follower = this.clusterMemberByIdMap.get(followerMemberId);
        if (null != follower && null == follower.publication()) {
            ClusterMember.addConsensusPublication(follower, this.ctx.consensusChannel(), this.ctx.consensusStreamId(), this.aeron, this.ctx.countedErrorHandler());
        }
    }

    private void runTerminationHook() {
        try {
            this.ctx.terminationHook().run();
        }
        catch (Exception ex) {
            this.ctx.countedErrorHandler().onError(ex);
        }
    }

    private String refineResponseChannel(String responseChannel) {
        if (null == this.responseChannelTemplate) {
            return responseChannel;
        }
        if (responseChannel.startsWith("aeron:ipc")) {
            return this.ctx.isIpcIngressAllowed() ? responseChannel : this.ctx.egressChannel();
        }
        ChannelUri channelUri = ChannelUri.parse(responseChannel);
        this.responseChannelTemplate.forEachParameter(channelUri::put);
        return channelUri.toString();
    }

    private void stopExistingCatchupReplay(ClusterMember follower) {
        if (-1L != follower.catchupReplaySessionId() && this.archive.archiveProxy().stopReplay(follower.catchupReplaySessionId(), this.aeron.nextCorrelationId(), this.archive.controlSessionId())) {
            follower.catchupReplaySessionId(-1L);
            follower.catchupReplayCorrelationId(-1L);
        }
    }

    private static boolean isCatchupAppendPosition(short flags) {
        return 0 != (1 & flags);
    }

    private void addSession(ClusterSession session) {
        int size;
        this.sessionByIdMap.put(session.id(), session);
        int addIndex = size = this.sessions.size();
        for (int i = size - 1; i >= 0; --i) {
            if (this.sessions.get(i).id() >= session.id()) continue;
            addIndex = i + 1;
            break;
        }
        if (size == addIndex) {
            this.sessions.add(session);
        } else {
            this.sessions.add(addIndex, session);
        }
    }

    private void closeSession(ClusterSession session) {
        long sessionId = session.id();
        this.sessionByIdMap.remove(sessionId);
        for (int i = this.sessions.size() - 1; i >= 0; --i) {
            if (this.sessions.get(i).id() != sessionId) continue;
            this.sessions.remove(i);
            break;
        }
        session.close(this.aeron, this.ctx.countedErrorHandler());
        if (null != this.consensusModuleExtension && null != session.closeReason()) {
            this.consensusModuleExtension.onSessionClosed(sessionId);
        }
    }

    private RecordingLog.RecoveryPlan recoverFromSnapshotAndLog() {
        RecordingLog.RecoveryPlan recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.serviceCount, this.logRecordingId);
        if (null != recoveryPlan.log) {
            this.logRecordingId(recoveryPlan.log.recordingId);
        }
        try (Counter ignore = this.addRecoveryStateCounter(recoveryPlan);){
            if (!recoveryPlan.snapshots.isEmpty()) {
                this.loadSnapshot(recoveryPlan.snapshots.get(0), this.archive);
            } else if (null != this.consensusModuleExtension) {
                this.consensusModuleExtension.onStart(this, null);
            }
            while (!ServiceAck.hasReached(this.expectedAckPosition, this.serviceAckId, this.serviceAckQueues)) {
                this.idle(this.consensusModuleAdapter.poll());
            }
            this.captureServiceClientIds();
            ++this.serviceAckId;
        }
        return recoveryPlan;
    }

    private RecordingLog.RecoveryPlan recoverFromBootstrapState() {
        ConsensusModuleStateExport boostrapState = this.ctx.boostrapState();
        this.logRecordingId(boostrapState.logRecordingId);
        RecordingLog.RecoveryPlan recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.serviceCount, this.logRecordingId);
        this.expectedAckPosition = boostrapState.expectedAckPosition;
        this.serviceAckId = boostrapState.serviceAckId;
        this.leadershipTermId = boostrapState.leadershipTermId;
        this.nextSessionId = boostrapState.nextSessionId;
        for (ConsensusModuleStateExport.TimerStateExport timer : boostrapState.timers) {
            this.onLoadTimer(timer.correlationId, timer.deadline, null, 0, 0);
        }
        for (ConsensusModuleStateExport.ClusterSessionStateExport sessionExport : boostrapState.sessions) {
            this.onLoadClusterSession(sessionExport.id, sessionExport.correlationId, sessionExport.openedLogPosition, sessionExport.timeOfLastActivityNs, sessionExport.closeReason, sessionExport.responseStreamId, sessionExport.responseChannel, null, 0, 0);
        }
        MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
        SessionMessageHeaderDecoder sessionMessageHeaderDecoder = new SessionMessageHeaderDecoder();
        ExpandableRingBuffer.MessageConsumer consumer = (buffer, offset, length, headOffset) -> {
            sessionMessageHeaderDecoder.wrap(buffer, offset + 8, messageHeaderDecoder.blockLength(), messageHeaderDecoder.version());
            this.onLoadPendingMessage(sessionMessageHeaderDecoder.clusterSessionId(), buffer, offset, length);
            return true;
        };
        for (ConsensusModuleStateExport.PendingServiceMessageTrackerStateExport tracker : boostrapState.pendingMessageTrackers) {
            this.onLoadPendingMessageTracker(tracker.nextServiceSessionId, tracker.logServiceSessionId, tracker.capacity, tracker.serviceId, null, 0, 0);
            tracker.pendingMessages.forEach(consumer, Integer.MAX_VALUE);
        }
        this.serviceProxy.requestServiceAck(this.expectedAckPosition);
        while (!ServiceAck.hasReached(this.expectedAckPosition, this.serviceAckId, this.serviceAckQueues)) {
            this.idle(this.consensusModuleAdapter.poll());
        }
        this.captureServiceClientIds();
        ++this.serviceAckId;
        return recoveryPlan;
    }

    private void replicateStandbySnapshotsForStartup() {
        try (StandbySnapshotReplicator standbySnapshotReplicator = StandbySnapshotReplicator.newInstance(this.ctx.clusterMemberId(), this.ctx.archiveContext(), this.recordingLog, this.serviceCount, this.ctx.leaderArchiveControlChannel(), this.ctx.archiveContext().controlRequestStreamId(), this.ctx.replicationChannel());){
            while (!standbySnapshotReplicator.isComplete()) {
                try {
                    this.ctx.idleStrategy().idle(standbySnapshotReplicator.poll(this.ctx.clusterClock().timeNanos()));
                }
                catch (ClusterException ex) {
                    this.ctx.errorHandler().onError(ex);
                    break;
                }
                ConsensusModuleAgent.checkInterruptStatus();
                this.aeronClientInvoker.invoke();
                if (!this.aeron.isClosed()) continue;
                throw new AgentTerminationException("unexpected Aeron close");
            }
        }
    }

    private int pollStandbySnapshotReplication(long nowNs) {
        int workCount = 0;
        if (null != this.standbySnapshotReplicator) {
            workCount += this.standbySnapshotReplicator.poll(nowNs);
            if (this.standbySnapshotReplicator.isComplete()) {
                this.ctx.snapshotCounter().increment();
                CloseHelper.quietClose(this.standbySnapshotReplicator);
                this.standbySnapshotReplicator = null;
            }
        }
        return workCount;
    }

    public String toString() {
        return "ConsensusModuleAgent{memberId=" + this.memberId + ", election=" + this.election + "}";
    }
}

