/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.client.ControlResponsePoller;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.ClusterControl;
import io.aeron.cluster.ClusterMember;
import io.aeron.cluster.ClusterSession;
import io.aeron.cluster.ClusterSessionProxy;
import io.aeron.cluster.ClusterTermination;
import io.aeron.cluster.ConsensusAdapter;
import io.aeron.cluster.ConsensusModule;
import io.aeron.cluster.ConsensusModuleAdapter;
import io.aeron.cluster.ConsensusModuleSnapshotLoader;
import io.aeron.cluster.ConsensusModuleSnapshotTaker;
import io.aeron.cluster.ConsensusPublisher;
import io.aeron.cluster.DynamicJoin;
import io.aeron.cluster.EgressPublisher;
import io.aeron.cluster.Election;
import io.aeron.cluster.IngressAdapter;
import io.aeron.cluster.LogAdapter;
import io.aeron.cluster.LogPublisher;
import io.aeron.cluster.LogReplay;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.ServiceAck;
import io.aeron.cluster.ServiceProxy;
import io.aeron.cluster.TimerService;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.ChangeType;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.ClusterAction;
import io.aeron.cluster.codecs.EventCode;
import io.aeron.cluster.codecs.SessionMessageHeaderDecoder;
import io.aeron.cluster.codecs.SessionMessageHeaderEncoder;
import io.aeron.cluster.codecs.SnapshotRecordingsDecoder;
import io.aeron.cluster.service.Cluster;
import io.aeron.cluster.service.ClusterClock;
import io.aeron.cluster.service.ClusterMarkFile;
import io.aeron.cluster.service.ClusteredServiceContainer;
import io.aeron.cluster.service.RecoveryState;
import io.aeron.exceptions.AeronException;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.security.Authenticator;
import io.aeron.status.ReadableCounter;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.agrona.BitUtil;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableRingBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.SemanticVersion;
import org.agrona.collections.ArrayListUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongCounterMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.LongArrayQueue;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.status.CountersReader;

final class ConsensusModuleAgent
implements Agent {
    static final long SLOW_TICK_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(10L);
    private static final int SERVICE_MESSAGE_LIMIT = 20;
    private final long sessionTimeoutNs;
    private final long leaderHeartbeatIntervalNs;
    private final long leaderHeartbeatTimeoutNs;
    private long unavailableCounterHandlerRegistrationId;
    private long nextSessionId = 1L;
    private long nextServiceSessionId = -9223372036854775807L;
    private long logServiceSessionId = Long.MIN_VALUE;
    private long leadershipTermId = -1L;
    private long replayLeadershipTermId = -1L;
    private long expectedAckPosition = 0L;
    private long serviceAckId = 0L;
    private long terminationPosition = -1L;
    private long notifiedCommitPosition = 0L;
    private long lastAppendPosition = 0L;
    private long timeOfLastLogUpdateNs = 0L;
    private long timeOfLastAppendPositionNs = 0L;
    private long timeOfLastMarkFileUpdateNs;
    private long timeOfLastSlowTickNs;
    private int pendingServiceMessageHeadOffset = 0;
    private int uncommittedServiceMessages = 0;
    private int memberId;
    private int highMemberId;
    private int pendingMemberRemovals = 0;
    private long logPublicationChannelTag;
    private ReadableCounter appendPosition;
    private final Counter commitPosition;
    private ConsensusModule.State state = ConsensusModule.State.INIT;
    private Cluster.Role role = Cluster.Role.FOLLOWER;
    private ClusterMember[] activeMembers;
    private ClusterMember[] passiveMembers = ClusterMember.EMPTY_MEMBERS;
    private ClusterMember leaderMember;
    private ClusterMember thisMember;
    private long[] rankedPositions;
    private final long[] serviceClientIds;
    private final ArrayDeque<ServiceAck>[] serviceAckQueues;
    private final Counter clusterRoleCounter;
    private final ClusterMarkFile markFile;
    private final AgentInvoker aeronClientInvoker;
    private final ClusterClock clusterClock;
    private final TimeUnit clusterTimeUnit;
    private final Counter moduleState;
    private final Counter controlToggle;
    private final TimerService timerService;
    private final ConsensusModuleAdapter consensusModuleAdapter;
    private final ServiceProxy serviceProxy;
    private final IngressAdapter ingressAdapter;
    private final EgressPublisher egressPublisher;
    private final LogPublisher logPublisher;
    private final LogAdapter logAdapter;
    private final ConsensusAdapter consensusAdapter;
    private final ConsensusPublisher consensusPublisher = new ConsensusPublisher();
    private final Long2ObjectHashMap<ClusterSession> sessionByIdMap = new Long2ObjectHashMap();
    private final ArrayList<ClusterSession> pendingSessions = new ArrayList();
    private final ArrayList<ClusterSession> rejectedSessions = new ArrayList();
    private final ArrayList<ClusterSession> redirectSessions = new ArrayList();
    private final Int2ObjectHashMap<ClusterMember> clusterMemberByIdMap = new Int2ObjectHashMap();
    private final Long2LongCounterMap expiredTimerCountByCorrelationIdMap = new Long2LongCounterMap(0L);
    private final ArrayDeque<ClusterSession> uncommittedClosedSessions = new ArrayDeque();
    private final LongArrayQueue uncommittedTimers = new LongArrayQueue(Long.MAX_VALUE);
    private final ExpandableRingBuffer pendingServiceMessages = new ExpandableRingBuffer();
    private final ExpandableRingBuffer.MessageConsumer serviceSessionMessageAppender = this::serviceSessionMessageAppender;
    private final ExpandableRingBuffer.MessageConsumer leaderServiceSessionMessageSweeper = this::leaderServiceSessionMessageSweeper;
    private final ExpandableRingBuffer.MessageConsumer followerServiceSessionMessageSweeper = this::followerServiceSessionMessageSweeper;
    private final Authenticator authenticator;
    private final ClusterSessionProxy sessionProxy;
    private final Aeron aeron;
    private final ConsensusModule.Context ctx;
    private final IdleStrategy idleStrategy;
    private final RecordingLog recordingLog;
    private final ArrayList<RecordingLog.Snapshot> dynamicJoinSnapshots = new ArrayList();
    private RecordingLog.RecoveryPlan recoveryPlan;
    private AeronArchive archive;
    private Election election;
    private DynamicJoin dynamicJoin;
    private ClusterTermination clusterTermination;
    private long logSubscriptionId = -1L;
    private String liveLogDestination;
    private String catchupLogDestination;
    private String ingressEndpoints;

    ConsensusModuleAgent(ConsensusModule.Context ctx) {
        this.ctx = ctx;
        this.aeron = ctx.aeron();
        this.clusterClock = ctx.clusterClock();
        this.clusterTimeUnit = this.clusterClock.timeUnit();
        this.sessionTimeoutNs = ctx.sessionTimeoutNs();
        this.leaderHeartbeatIntervalNs = ctx.leaderHeartbeatIntervalNs();
        this.leaderHeartbeatTimeoutNs = ctx.leaderHeartbeatTimeoutNs();
        this.egressPublisher = ctx.egressPublisher();
        this.moduleState = ctx.moduleStateCounter();
        this.commitPosition = ctx.commitPositionCounter();
        this.controlToggle = ctx.controlToggleCounter();
        this.logPublisher = ctx.logPublisher();
        this.idleStrategy = ctx.idleStrategy();
        this.timerService = new TimerService(this, this.clusterTimeUnit, 0L, BitUtil.findNextPositivePowerOfTwo(this.clusterTimeUnit.convert(ctx.wheelTickResolutionNs(), TimeUnit.NANOSECONDS)), ctx.ticksPerWheel());
        this.activeMembers = ClusterMember.parse(ctx.clusterMembers());
        this.sessionProxy = new ClusterSessionProxy(this.egressPublisher);
        this.memberId = ctx.clusterMemberId();
        this.clusterRoleCounter = ctx.clusterNodeRoleCounter();
        this.markFile = ctx.clusterMarkFile();
        this.recordingLog = ctx.recordingLog();
        this.serviceClientIds = new long[ctx.serviceCount()];
        Arrays.fill(this.serviceClientIds, -1L);
        this.serviceAckQueues = ServiceAck.newArray(ctx.serviceCount());
        this.highMemberId = ClusterMember.highMemberId(this.activeMembers);
        this.aeronClientInvoker = this.aeron.conductorAgentInvoker();
        this.aeronClientInvoker.invoke();
        this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
        this.role(Cluster.Role.FOLLOWER);
        ClusterMember.addClusterMemberIds(this.activeMembers, this.clusterMemberByIdMap);
        this.leaderMember = this.thisMember = ClusterMember.determineMember(this.activeMembers, ctx.clusterMemberId(), ctx.memberEndpoints());
        ChannelUri consensusUri = ChannelUri.parse(ctx.consensusChannel());
        if (!consensusUri.containsKey("endpoint")) {
            consensusUri.put("endpoint", this.thisMember.consensusEndpoint());
        }
        int statusStreamId = ctx.consensusStreamId();
        this.consensusAdapter = new ConsensusAdapter(this.aeron.addSubscription(consensusUri.toString(), statusStreamId), this);
        ClusterMember.addConsensusPublications(this.activeMembers, this.thisMember, consensusUri, statusStreamId, this.aeron);
        this.ingressAdapter = new IngressAdapter(ctx.ingressFragmentLimit(), this);
        this.logAdapter = new LogAdapter(this, ctx.logFragmentLimit());
        this.consensusModuleAdapter = new ConsensusModuleAdapter(this.aeron.addSubscription(ctx.controlChannel(), ctx.consensusModuleStreamId()), this);
        this.serviceProxy = new ServiceProxy(this.aeron.addPublication(ctx.controlChannel(), ctx.serviceStreamId()));
        this.authenticator = (Authenticator)ctx.authenticatorSupplier().get();
    }

    @Override
    public void onClose() {
        if (!this.aeron.isClosed()) {
            this.aeron.removeUnavailableCounterHandler(this.unavailableCounterHandlerRegistrationId);
            this.tryStopLogRecording();
            if (!this.ctx.ownsAeronClient()) {
                this.logPublisher.disconnect(this.ctx.countedErrorHandler());
                this.logAdapter.disconnect(this.ctx.countedErrorHandler());
                CountedErrorHandler errorHandler = this.ctx.countedErrorHandler();
                for (ClusterSession session : this.sessionByIdMap.values()) {
                    session.close(errorHandler);
                }
                CloseHelper.close(errorHandler, this.ingressAdapter);
                ClusterMember.closeConsensusPublications(errorHandler, this.activeMembers);
                CloseHelper.close(errorHandler, this.consensusAdapter);
                CloseHelper.close(errorHandler, this.serviceProxy);
                CloseHelper.close(errorHandler, this.consensusModuleAdapter);
                CloseHelper.close(errorHandler, this.archive);
            }
            this.state(ConsensusModule.State.CLOSED);
        }
        this.ctx.close();
    }

    @Override
    public void onStart() {
        this.archive = AeronArchive.connect(this.ctx.archiveContext().clone());
        this.dynamicJoin = this.requiresDynamicJoin();
        if (null == this.dynamicJoin) {
            long lastTermRecordingId = this.recordingLog.findLastTermRecordingId();
            if (-1L != lastTermRecordingId) {
                this.archive.tryStopRecordingByIdentity(lastTermRecordingId);
            }
            this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount());
            try (Counter ignore = this.addRecoveryStateCounter(this.recoveryPlan);){
                if (!this.recoveryPlan.snapshots.isEmpty()) {
                    this.loadSnapshot(this.recoveryPlan.snapshots.get(0), this.archive);
                }
                while (!ServiceAck.hasReached(this.expectedAckPosition, this.serviceAckId, this.serviceAckQueues)) {
                    this.idle(this.consensusModuleAdapter.poll());
                }
                this.captureServiceClientIds();
                ++this.serviceAckId;
            }
            this.state(ConsensusModule.State.ACTIVE);
            this.election = new Election(true, this.recoveryPlan.lastLeadershipTermId, this.commitPosition.getWeak(), this.recoveryPlan.appendedLogPosition, this.activeMembers, this.clusterMemberByIdMap, this.thisMember, this.consensusPublisher, this.ctx, this);
        }
        this.unavailableCounterHandlerRegistrationId = this.aeron.addUnavailableCounterHandler(this::onUnavailableCounter);
    }

    @Override
    public int doWork() {
        int workCount = 0;
        long now = this.clusterClock.time();
        long nowNs = this.clusterTimeUnit.toNanos(now);
        if (nowNs >= this.timeOfLastSlowTickNs + SLOW_TICK_INTERVAL_NS) {
            this.timeOfLastSlowTickNs = nowNs;
            workCount += this.slowTickWork(this.clusterTimeUnit.toMillis(now), nowNs);
        }
        workCount += this.consensusAdapter.poll();
        workCount = null != this.dynamicJoin ? (workCount += this.dynamicJoin.doWork(nowNs)) : (null != this.election ? (workCount += this.election.doWork(nowNs)) : (workCount += this.consensusWork(now, nowNs)));
        return workCount;
    }

    @Override
    public String roleName() {
        return "consensus-module";
    }

    void onSessionConnect(long correlationId, int responseStreamId, int version, String responseChannel, byte[] encodedCredentials) {
        long l;
        if (Cluster.Role.LEADER == this.role) {
            long l2 = this.nextSessionId;
            l = l2;
            this.nextSessionId = l2 + 1L;
        } else {
            l = -1L;
        }
        long clusterSessionId = l;
        ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, responseChannel);
        long now = this.clusterClock.time();
        session.lastActivityNs(this.clusterTimeUnit.toNanos(now), correlationId);
        session.connect(this.aeron);
        if (Cluster.Role.LEADER != this.role) {
            this.redirectSessions.add(session);
        } else if (0 != SemanticVersion.major(version)) {
            String detail = "invalid client version " + SemanticVersion.toString(version) + ", cluster is " + SemanticVersion.toString(AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION);
            session.reject(EventCode.ERROR, detail);
            this.rejectedSessions.add(session);
        } else if (this.pendingSessions.size() + this.sessionByIdMap.size() >= this.ctx.maxConcurrentSessions()) {
            session.reject(EventCode.ERROR, "concurrent session limit");
            this.rejectedSessions.add(session);
        } else {
            this.authenticator.onConnectRequest(session.id(), encodedCredentials, this.clusterTimeUnit.toMillis(now));
            this.pendingSessions.add(session);
        }
    }

    void onSessionClose(long leadershipTermId, long clusterSessionId) {
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (leadershipTermId == this.leadershipTermId && null != session && Cluster.Role.LEADER == this.role) {
            session.closing(CloseReason.CLIENT_ACTION);
            session.disconnect(this.ctx.errorHandler());
            if (this.logPublisher.appendSessionClose(session, leadershipTermId, this.clusterClock.time())) {
                session.closedLogPosition(this.logPublisher.position());
                this.uncommittedClosedSessions.addLast(session);
                this.sessionByIdMap.remove(clusterSessionId);
                session.close(this.ctx.errorHandler());
            }
        }
    }

    ControlledFragmentHandler.Action onIngressMessage(long leadershipTermId, long clusterSessionId, DirectBuffer buffer, int offset, int length) {
        long now;
        if (leadershipTermId != this.leadershipTermId || Cluster.Role.LEADER != this.role) {
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (null == session || session.state() == ClusterSession.State.CLOSED) {
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        if (session.state() == ClusterSession.State.OPEN && this.logPublisher.appendMessage(leadershipTermId, clusterSessionId, now = this.clusterClock.time(), buffer, offset, length) > 0L) {
            session.timeOfLastActivityNs(this.clusterTimeUnit.toNanos(now));
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        return ControlledFragmentHandler.Action.ABORT;
    }

    void onSessionKeepAlive(long leadershipTermId, long clusterSessionId) {
        ClusterSession session;
        if (Cluster.Role.LEADER == this.role && leadershipTermId == this.leadershipTermId && null != (session = this.sessionByIdMap.get(clusterSessionId)) && session.state() == ClusterSession.State.OPEN) {
            session.timeOfLastActivityNs(this.clusterTimeUnit.toNanos(this.clusterClock.time()));
        }
    }

    void onChallengeResponse(long correlationId, long clusterSessionId, byte[] encodedCredentials) {
        if (Cluster.Role.LEADER == this.role) {
            int lastIndex;
            for (int i = lastIndex = this.pendingSessions.size() - 1; i >= 0; --i) {
                ClusterSession session = this.pendingSessions.get(i);
                if (session.id() != clusterSessionId || session.state() != ClusterSession.State.CHALLENGED) continue;
                long now = this.clusterClock.time();
                long nowMs = this.clusterTimeUnit.toMillis(now);
                session.lastActivityNs(this.clusterTimeUnit.toNanos(now), correlationId);
                this.authenticator.onChallengeResponse(clusterSessionId, encodedCredentials, nowMs);
                break;
            }
        }
    }

    boolean onTimerEvent(long correlationId) {
        long appendPosition = this.logPublisher.appendTimer(correlationId, this.leadershipTermId, this.clusterClock.time());
        if (appendPosition > 0L) {
            this.uncommittedTimers.offerLong(appendPosition);
            this.uncommittedTimers.offerLong(correlationId);
            return true;
        }
        return false;
    }

    void onCanvassPosition(long logLeadershipTermId, long logPosition, int followerMemberId) {
        RecordingLog.Entry termEntry;
        ClusterMember follower;
        if (null != this.election) {
            this.election.onCanvassPosition(logLeadershipTermId, logPosition, followerMemberId);
        } else if (Cluster.Role.LEADER == this.role && null != (follower = this.clusterMemberByIdMap.get(followerMemberId)) && logLeadershipTermId <= this.leadershipTermId && null != (termEntry = this.recordingLog.findTermEntry(logLeadershipTermId < this.leadershipTermId ? logLeadershipTermId + 1L : logLeadershipTermId))) {
            long appendPosition = this.logPublisher.position();
            this.consensusPublisher.newLeadershipTerm(follower.publication(), logLeadershipTermId, logLeadershipTermId < this.leadershipTermId ? termEntry.termBaseLogPosition : appendPosition, this.leadershipTermId, appendPosition, termEntry.timestamp, this.memberId, this.logPublisher.sessionId(), false);
        }
    }

    void onRequestVote(long logLeadershipTermId, long logPosition, long candidateTermId, int candidateId) {
        if (null != this.election) {
            this.election.onRequestVote(logLeadershipTermId, logPosition, candidateTermId, candidateId);
        } else if (candidateTermId > this.leadershipTermId && null == this.dynamicJoin) {
            this.ctx.countedErrorHandler().onError(new ClusterException("unexpected vote request", AeronException.Category.WARN));
            this.enterElection(this.clusterClock.timeNanos());
            this.election.onRequestVote(logLeadershipTermId, logPosition, candidateTermId, candidateId);
        }
    }

    void onVote(long candidateTermId, long logLeadershipTermId, long logPosition, int candidateMemberId, int followerMemberId, boolean vote) {
        if (null != this.election) {
            this.election.onVote(candidateTermId, logLeadershipTermId, logPosition, candidateMemberId, followerMemberId, vote);
        }
    }

    void onNewLeadershipTerm(long logLeadershipTermId, long logTruncatePosition, long leadershipTermId, long logPosition, long timestamp, int leaderId, int logSessionId, boolean isStartup) {
        if (null != this.election) {
            this.election.onNewLeadershipTerm(logLeadershipTermId, logTruncatePosition, leadershipTermId, logPosition, timestamp, leaderId, logSessionId, isStartup);
        } else if (Cluster.Role.FOLLOWER == this.role && leadershipTermId == this.leadershipTermId && leaderId == this.leaderMember.id()) {
            this.timeOfLastLogUpdateNs = this.clusterClock.timeNanos();
            this.notifiedCommitPosition = Math.max(this.notifiedCommitPosition, logPosition);
        } else if (leadershipTermId > this.leadershipTermId && null == this.dynamicJoin) {
            this.ctx.countedErrorHandler().onError(new ClusterException("unexpected new leadership term", AeronException.Category.WARN));
            this.enterElection(this.clusterClock.timeNanos());
        }
    }

    void onAppendPosition(long leadershipTermId, long logPosition, int followerMemberId) {
        ClusterMember follower;
        if (null != this.election) {
            this.election.onAppendPosition(leadershipTermId, logPosition, followerMemberId);
        } else if (Cluster.Role.LEADER == this.role && leadershipTermId == this.leadershipTermId && null != (follower = this.clusterMemberByIdMap.get(followerMemberId))) {
            follower.logPosition(logPosition).timeOfLastAppendPositionNs(this.clusterClock.timeNanos());
            this.trackCatchupCompletion(follower, leadershipTermId);
        }
    }

    void onCommitPosition(long leadershipTermId, long logPosition, int leaderMemberId) {
        if (null != this.election) {
            this.election.onCommitPosition(leadershipTermId, logPosition, leaderMemberId);
        } else if (Cluster.Role.FOLLOWER == this.role && leadershipTermId == this.leadershipTermId && leaderMemberId == this.leaderMember.id()) {
            this.timeOfLastLogUpdateNs = this.clusterClock.timeNanos();
            this.notifiedCommitPosition = logPosition;
        } else if (leadershipTermId > this.leadershipTermId && null == this.dynamicJoin) {
            this.ctx.countedErrorHandler().onError(new ClusterException("unexpected commit position", AeronException.Category.WARN));
            this.enterElection(this.clusterClock.timeNanos());
        }
    }

    void onCatchupPosition(long leadershipTermId, long logPosition, int followerMemberId) {
        ClusterMember follower;
        if (Cluster.Role.LEADER == this.role && leadershipTermId == this.leadershipTermId && null != (follower = this.clusterMemberByIdMap.get(followerMemberId)) && follower.catchupReplaySessionId() == -1L) {
            String channel = new ChannelUriStringBuilder().media("udp").endpoint(follower.catchupEndpoint()).sessionId(this.logPublisher.sessionId()).linger(0L).eos(Boolean.FALSE).build();
            follower.catchupReplaySessionId(this.archive.startReplay(this.logRecordingId(), logPosition, Long.MAX_VALUE, channel, this.ctx.logStreamId()));
            follower.catchupReplayCorrelationId(this.archive.lastCorrelationId());
        }
    }

    void onStopCatchup(long leadershipTermId, int followerMemberId) {
        if (null != this.catchupLogDestination && followerMemberId == this.memberId && leadershipTermId == this.leadershipTermId) {
            this.logAdapter.asyncRemoveDestination(this.catchupLogDestination);
            this.catchupLogDestination = null;
        }
    }

    void onAddPassiveMember(long correlationId, String memberEndpoints) {
        if (null == this.election && null == this.dynamicJoin) {
            if (Cluster.Role.LEADER == this.role) {
                if (ClusterMember.notDuplicateEndpoint(this.passiveMembers, memberEndpoints)) {
                    ClusterMember newMember = ClusterMember.parseEndpoints(++this.highMemberId, memberEndpoints);
                    newMember.correlationId(correlationId);
                    this.passiveMembers = ClusterMember.addMember(this.passiveMembers, newMember);
                    this.clusterMemberByIdMap.put(newMember.id(), newMember);
                    ClusterMember.addConsensusPublication(newMember, ChannelUri.parse(this.ctx.consensusChannel()), this.ctx.consensusStreamId(), this.aeron);
                    this.logPublisher.addDestination(this.ctx.isLogMdc(), newMember.logEndpoint());
                }
            } else if (Cluster.Role.FOLLOWER == this.role) {
                this.consensusPublisher.addPassiveMember(this.leaderMember.publication(), correlationId, memberEndpoints);
            }
        }
    }

    void onClusterMembersChange(long correlationId, int leaderMemberId, String activeMembers, String passiveMembers) {
        if (null != this.dynamicJoin) {
            this.dynamicJoin.onClusterMembersChange(correlationId, leaderMemberId, activeMembers, passiveMembers);
        }
    }

    void onSnapshotRecordingQuery(long correlationId, int requestMemberId) {
        ClusterMember requester;
        if (null == this.election && Cluster.Role.LEADER == this.role && null != (requester = this.clusterMemberByIdMap.get(requestMemberId))) {
            this.consensusPublisher.snapshotRecording(requester.publication(), correlationId, this.recoveryPlan, ClusterMember.encodeAsString(this.activeMembers));
        }
    }

    void onSnapshotRecordings(long correlationId, SnapshotRecordingsDecoder decoder) {
        if (null != this.dynamicJoin) {
            this.dynamicJoin.onSnapshotRecordings(correlationId, decoder);
        }
    }

    void onJoinCluster(long leadershipTermId, int memberId) {
        if (null == this.election && Cluster.Role.LEADER == this.role) {
            long snapshotLeadershipTermId;
            ClusterMember member = this.clusterMemberByIdMap.get(memberId);
            long l = snapshotLeadershipTermId = this.recoveryPlan.snapshots.isEmpty() ? -1L : this.recoveryPlan.snapshots.get((int)0).leadershipTermId;
            if (null != member && !member.hasRequestedJoin() && leadershipTermId <= snapshotLeadershipTermId) {
                if (null == member.publication()) {
                    ChannelUri consensusUri = ChannelUri.parse(this.ctx.consensusChannel());
                    int streamId = this.ctx.consensusStreamId();
                    ClusterMember.addConsensusPublication(member, consensusUri, streamId, this.aeron);
                    this.logPublisher.addDestination(this.ctx.isLogMdc(), member.logEndpoint());
                }
                member.hasRequestedJoin(true);
            }
        }
    }

    void onTerminationPosition(long leadershipTermId, long logPosition) {
        if (leadershipTermId == this.leadershipTermId && Cluster.Role.FOLLOWER == this.role) {
            this.terminationPosition = logPosition;
        }
    }

    void onTerminationAck(long leadershipTermId, long logPosition, int memberId) {
        ClusterMember member;
        if (Cluster.Role.LEADER == this.role && leadershipTermId == this.leadershipTermId && logPosition == this.terminationPosition && null != (member = this.clusterMemberByIdMap.get(memberId))) {
            member.hasTerminated(true);
            if (this.clusterTermination.canTerminate(this.activeMembers, this.terminationPosition, this.clusterClock.timeNanos())) {
                this.recordingLog.commitLogPosition(leadershipTermId, logPosition);
                this.closeAndTerminate();
            }
        }
    }

    void onBackupQuery(long correlationId, int responseStreamId, int version, String responseChannel, byte[] encodedCredentials) {
        if (null == this.election && null == this.dynamicJoin) {
            if (Cluster.Role.LEADER != this.role) {
                this.consensusPublisher.backupQuery(this.leaderMember.publication(), correlationId, responseStreamId, version, responseChannel, encodedCredentials);
            } else if (this.state == ConsensusModule.State.ACTIVE || this.state == ConsensusModule.State.SUSPENDED) {
                ClusterSession session = new ClusterSession(-1L, responseStreamId, responseChannel);
                long now = this.clusterClock.time();
                session.lastActivityNs(this.clusterTimeUnit.toNanos(now), correlationId);
                session.markAsBackupSession();
                session.connect(this.aeron);
                if (0 != SemanticVersion.major(version)) {
                    String detail = "invalid client version " + SemanticVersion.toString(version) + ", cluster=" + SemanticVersion.toString(AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION);
                    session.reject(EventCode.ERROR, detail);
                    this.rejectedSessions.add(session);
                } else if (this.pendingSessions.size() + this.sessionByIdMap.size() >= this.ctx.maxConcurrentSessions()) {
                    session.reject(EventCode.ERROR, "concurrent session limit");
                    this.rejectedSessions.add(session);
                } else {
                    this.authenticator.onConnectRequest(session.id(), encodedCredentials, this.clusterTimeUnit.toMillis(now));
                    this.pendingSessions.add(session);
                }
            }
        }
    }

    void onRemoveMember(int memberId, boolean isPassive) {
        ClusterMember member;
        if (null == this.election && Cluster.Role.LEADER == this.role && null != (member = this.clusterMemberByIdMap.get(memberId))) {
            if (isPassive) {
                this.passiveMembers = ClusterMember.removeMember(this.passiveMembers, memberId);
                member.closePublication(this.ctx.countedErrorHandler());
                this.logPublisher.removeDestination(this.ctx.isLogMdc(), member.logEndpoint());
                this.clusterMemberByIdMap.remove(memberId);
                this.clusterMemberByIdMap.compact();
            } else {
                long now = this.clusterClock.time();
                long position = this.logPublisher.appendMembershipChangeEvent(this.leadershipTermId, now, this.memberId, this.activeMembers.length, ChangeType.QUIT, memberId, ClusterMember.encodeAsString(ClusterMember.removeMember(this.activeMembers, memberId)));
                if (position > 0L) {
                    this.timeOfLastLogUpdateNs = this.clusterTimeUnit.toNanos(now) - this.leaderHeartbeatIntervalNs;
                    member.removalPosition(position);
                    ++this.pendingMemberRemovals;
                }
            }
        }
    }

    void onClusterMembersQuery(long correlationId, boolean isExtendedRequest) {
        if (isExtendedRequest) {
            this.serviceProxy.clusterMembersExtendedResponse(correlationId, this.clusterClock.timeNanos(), this.leaderMember.id(), this.memberId, this.activeMembers, this.passiveMembers);
        } else {
            this.serviceProxy.clusterMembersResponse(correlationId, this.leaderMember.id(), ClusterMember.encodeAsString(this.activeMembers), ClusterMember.encodeAsString(this.passiveMembers));
        }
    }

    void state(ConsensusModule.State newState) {
        if (newState != this.state) {
            this.stateChange(this.state, newState, this.memberId);
            this.state = newState;
            if (!this.moduleState.isClosed()) {
                this.moduleState.set(newState.code());
            }
        }
    }

    void stateChange(ConsensusModule.State oldState, ConsensusModule.State newState, int memberId) {
    }

    void role(Cluster.Role newRole) {
        if (newRole != this.role) {
            this.roleChange(this.role, newRole, this.memberId);
            this.role = newRole;
            this.clusterRoleCounter.setOrdered(newRole.code());
        }
    }

    void roleChange(Cluster.Role oldRole, Cluster.Role newRole, int memberId) {
    }

    Cluster.Role role() {
        return this.role;
    }

    long prepareForNewLeadership(long logPosition) {
        long recordingId = this.logRecordingId();
        long appendPosition = 0L;
        if (-1L != recordingId) {
            if (null != this.liveLogDestination) {
                this.logAdapter.asyncRemoveDestination(this.liveLogDestination);
                this.liveLogDestination = null;
            }
            if (null != this.catchupLogDestination) {
                this.logAdapter.asyncRemoveDestination(this.catchupLogDestination);
                this.catchupLogDestination = null;
            }
            this.logAdapter.disconnect(this.ctx.countedErrorHandler());
            this.logPublisher.disconnect(this.ctx.countedErrorHandler());
            this.tryStopLogRecording();
            this.idleStrategy.reset();
            while (-1L == (appendPosition = this.archive.getStopPosition(recordingId))) {
                this.idle();
            }
            this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount());
            this.clearSessionsAfter(logPosition);
            for (ClusterSession session : this.sessionByIdMap.values()) {
                session.disconnect(this.ctx.countedErrorHandler());
            }
            this.commitPosition.setOrdered(logPosition);
            this.restoreUncommittedEntries(logPosition);
        }
        return appendPosition;
    }

    void appendPositionCounter(ReadableCounter appendPositionCounter) {
        this.appendPosition = appendPositionCounter;
    }

    void onServiceCloseSession(long clusterSessionId) {
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (null != session) {
            session.closing(CloseReason.SERVICE_ACTION);
            if (Cluster.Role.LEADER == this.role && this.logPublisher.appendSessionClose(session, this.leadershipTermId, this.clusterClock.time())) {
                String msg = CloseReason.SERVICE_ACTION.name();
                this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, msg);
                session.closedLogPosition(this.logPublisher.position());
                this.uncommittedClosedSessions.addLast(session);
                this.sessionByIdMap.remove(clusterSessionId);
                session.close(this.ctx.errorHandler());
            }
        }
    }

    void onServiceMessage(long leadershipTermId, DirectBuffer buffer, int offset, int length) {
        if (leadershipTermId == this.leadershipTermId) {
            this.enqueueServiceSessionMessage((MutableDirectBuffer)buffer, offset, length, this.nextServiceSessionId++);
        }
    }

    void onScheduleTimer(long correlationId, long deadline) {
        if (this.expiredTimerCountByCorrelationIdMap.get(correlationId) == 0L) {
            this.timerService.scheduleTimerForCorrelationId(correlationId, deadline);
        } else {
            this.expiredTimerCountByCorrelationIdMap.decrementAndGet(correlationId);
        }
    }

    void onCancelTimer(long correlationId) {
        this.timerService.cancelTimerByCorrelationId(correlationId);
    }

    void onServiceAck(long logPosition, long timestamp, long ackId, long relevantId, int serviceId) {
        this.captureServiceAck(logPosition, ackId, relevantId, serviceId);
        if (ServiceAck.hasReached(logPosition, this.serviceAckId, this.serviceAckQueues)) {
            if (ConsensusModule.State.SNAPSHOT == this.state) {
                ServiceAck[] serviceAcks = this.pollServiceAcks(logPosition, serviceId);
                ++this.serviceAckId;
                this.takeSnapshot(timestamp, logPosition, serviceAcks);
                if (-1L == this.terminationPosition) {
                    long nowNs = this.clusterClock.timeNanos();
                    for (ClusterSession session : this.sessionByIdMap.values()) {
                        session.timeOfLastActivityNs(nowNs);
                    }
                    if (Cluster.Role.LEADER == this.role) {
                        ClusterControl.ToggleState.reset(this.controlToggle);
                    }
                    this.state(ConsensusModule.State.ACTIVE);
                } else {
                    this.serviceProxy.terminationPosition(this.terminationPosition, this.ctx.countedErrorHandler());
                    if (null != this.clusterTermination) {
                        this.clusterTermination.deadlineNs(this.clusterClock.timeNanos() + this.ctx.terminationTimeoutNs());
                    }
                    this.state(ConsensusModule.State.TERMINATING);
                }
            } else if (ConsensusModule.State.QUITTING == this.state) {
                this.closeAndTerminate();
            } else if (ConsensusModule.State.TERMINATING == this.state) {
                boolean canTerminate;
                if (null == this.clusterTermination) {
                    this.consensusPublisher.terminationAck(this.leaderMember.publication(), this.leadershipTermId, logPosition, this.memberId);
                    canTerminate = true;
                } else {
                    this.clusterTermination.onServicesTerminated();
                    canTerminate = this.clusterTermination.canTerminate(this.activeMembers, this.terminationPosition, this.clusterClock.timeNanos());
                }
                if (canTerminate) {
                    this.recordingLog.commitLogPosition(this.leadershipTermId, logPosition);
                    this.closeAndTerminate();
                }
            }
        }
    }

    void onReplaySessionMessage(long clusterSessionId, long timestamp) {
        ClusterSession clusterSession = this.sessionByIdMap.get(clusterSessionId);
        if (null == clusterSession) {
            this.logServiceSessionId = clusterSessionId;
            this.pendingServiceMessages.consume(this.followerServiceSessionMessageSweeper, Integer.MAX_VALUE);
        } else {
            clusterSession.timeOfLastActivityNs(this.clusterTimeUnit.toNanos(timestamp));
        }
    }

    void onReplayTimerEvent(long correlationId) {
        if (!this.timerService.cancelTimerByCorrelationId(correlationId)) {
            this.expiredTimerCountByCorrelationIdMap.getAndIncrement(correlationId);
        }
    }

    void onReplaySessionOpen(long logPosition, long correlationId, long clusterSessionId, long timestamp, int responseStreamId, String responseChannel) {
        ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, responseChannel);
        session.open(logPosition);
        session.lastActivityNs(this.clusterTimeUnit.toNanos(timestamp), correlationId);
        this.sessionByIdMap.put(clusterSessionId, session);
        if (clusterSessionId >= this.nextSessionId) {
            this.nextSessionId = clusterSessionId + 1L;
        }
    }

    void onReplaySessionClose(long clusterSessionId, CloseReason closeReason) {
        ClusterSession clusterSession = this.sessionByIdMap.remove(clusterSessionId);
        if (null != clusterSession) {
            clusterSession.closing(closeReason);
            clusterSession.close(this.ctx.countedErrorHandler());
        }
    }

    void onReplayClusterAction(long leadershipTermId, ClusterAction action) {
        if (leadershipTermId == this.replayLeadershipTermId) {
            if (ClusterAction.SUSPEND == action) {
                this.state(ConsensusModule.State.SUSPENDED);
            } else if (ClusterAction.RESUME == action) {
                this.state(ConsensusModule.State.ACTIVE);
            } else if (ClusterAction.SNAPSHOT == action) {
                this.state(ConsensusModule.State.SNAPSHOT);
            }
        }
    }

    void onReplayNewLeadershipTermEvent(long leadershipTermId, long logPosition, long timestamp, long termBaseLogPosition, TimeUnit timeUnit, int appVersion) {
        if (timeUnit != this.clusterTimeUnit) {
            this.ctx.errorHandler().onError(new ClusterException("incompatible timestamp units: " + (Object)((Object)this.clusterTimeUnit) + " log=" + (Object)((Object)timeUnit), AeronException.Category.FATAL));
            this.closeAndTerminate();
            return;
        }
        if (SemanticVersion.major(this.ctx.appVersion()) != SemanticVersion.major(appVersion)) {
            this.ctx.errorHandler().onError(new ClusterException("incompatible version: " + SemanticVersion.toString(this.ctx.appVersion()) + " log=" + SemanticVersion.toString(appVersion), AeronException.Category.FATAL));
            this.closeAndTerminate();
            return;
        }
        this.leadershipTermId(leadershipTermId);
        if (null != this.election && null != this.appendPosition) {
            long recordingId = RecordingPos.getRecordingId(this.aeron.countersReader(), this.appendPosition.counterId());
            this.election.onReplayNewLeadershipTermEvent(recordingId, leadershipTermId, logPosition, timestamp, termBaseLogPosition);
        }
    }

    void onReplayMembershipChange(long leadershipTermId, long logPosition, int leaderMemberId, ChangeType changeType, int memberId, String clusterMembers) {
        if (leadershipTermId == this.replayLeadershipTermId) {
            if (ChangeType.JOIN == changeType) {
                ClusterMember[] newMembers = ClusterMember.parse(clusterMembers);
                if (memberId == this.memberId) {
                    this.activeMembers = newMembers;
                    this.clusterMemberByIdMap.clear();
                    this.clusterMemberByIdMap.compact();
                    ClusterMember.addClusterMemberIds(newMembers, this.clusterMemberByIdMap);
                    this.thisMember = ClusterMember.findMember(this.activeMembers, memberId);
                    this.leaderMember = ClusterMember.findMember(this.activeMembers, leaderMemberId);
                    ClusterMember.addConsensusPublications(newMembers, this.thisMember, ChannelUri.parse(this.ctx.consensusChannel()), this.ctx.consensusStreamId(), this.aeron);
                } else {
                    this.clusterMemberJoined(memberId, newMembers);
                }
            } else if (ChangeType.QUIT == changeType) {
                if (memberId == this.memberId) {
                    this.state(ConsensusModule.State.QUITTING);
                } else {
                    this.clusterMemberQuit(memberId);
                    if (leaderMemberId == memberId && null == this.election) {
                        this.commitPosition.proposeMaxOrdered(logPosition);
                        this.enterElection(this.clusterClock.timeNanos());
                    }
                }
            }
            if (null != this.election) {
                this.election.onMembershipChange(this.activeMembers, changeType, memberId, logPosition);
            }
        }
    }

    void onLoadSession(long clusterSessionId, long correlationId, long openedPosition, long timeOfLastActivity, CloseReason closeReason, int responseStreamId, String responseChannel) {
        this.sessionByIdMap.put(clusterSessionId, new ClusterSession(clusterSessionId, correlationId, openedPosition, timeOfLastActivity, responseStreamId, responseChannel, closeReason));
        if (clusterSessionId >= this.nextSessionId) {
            this.nextSessionId = clusterSessionId + 1L;
        }
    }

    void onLoadPendingMessage(DirectBuffer buffer, int offset, int length) {
        this.pendingServiceMessages.append(buffer, offset, length);
    }

    void onLoadConsensusModuleState(long nextSessionId, long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity) {
        this.nextSessionId = nextSessionId;
        this.nextServiceSessionId = nextServiceSessionId;
        this.logServiceSessionId = logServiceSessionId;
        this.pendingServiceMessages.reset(pendingMessageCapacity);
    }

    void onLoadClusterMembers(int memberId, int highMemberId, String members) {
        if (null == this.dynamicJoin && !this.ctx.clusterMembersIgnoreSnapshot()) {
            if (-1 == this.memberId) {
                this.memberId = memberId;
                this.ctx.clusterMarkFile().memberId(memberId);
            }
            if (ClusterMember.EMPTY_MEMBERS == this.activeMembers) {
                this.activeMembers = ClusterMember.parse(members);
                this.highMemberId = Math.max(ClusterMember.highMemberId(this.activeMembers), highMemberId);
                this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
                this.thisMember = this.clusterMemberByIdMap.get(memberId);
                ChannelUri consensusUri = ChannelUri.parse(this.ctx.consensusChannel());
                consensusUri.put("endpoint", this.thisMember.consensusEndpoint());
                ClusterMember.addConsensusPublications(this.activeMembers, this.thisMember, consensusUri, this.ctx.consensusStreamId(), this.aeron);
            }
        }
    }

    int addLogPublication() {
        long logPublicationTag = this.aeron.nextCorrelationId();
        this.logPublicationChannelTag = this.aeron.nextCorrelationId();
        ChannelUri channelUri = ChannelUri.parse(this.ctx.logChannel());
        channelUri.put("alias", "log");
        channelUri.put("tags", this.logPublicationChannelTag + "," + logPublicationTag);
        if (channelUri.isUdp()) {
            if (!channelUri.containsKey("fc")) {
                long timeout = Math.max(TimeUnit.NANOSECONDS.toSeconds(this.ctx.leaderHeartbeatTimeoutNs() >> 1), 2L);
                channelUri.put("fc", "min,t:" + timeout + "s");
            }
            if (this.ctx.isLogMdc()) {
                channelUri.put("control-mode", "manual");
            }
            channelUri.put("ssc", Boolean.toString(this.activeMembers.length == 1));
        }
        if (null != this.recoveryPlan.log) {
            channelUri.initialPosition(this.recoveryPlan.appendedLogPosition, this.recoveryPlan.log.initialTermId, this.recoveryPlan.log.termBufferLength);
            channelUri.put("mtu", Integer.toString(this.recoveryPlan.log.mtuLength));
        }
        String channel = channelUri.toString();
        ExclusivePublication publication = this.aeron.addExclusivePublication(channel, this.ctx.logStreamId());
        if (this.ctx.isLogMdc()) {
            for (ClusterMember member : this.activeMembers) {
                if (member.id() == this.memberId) continue;
                publication.asyncAddDestination("aeron:udp?endpoint=" + member.logEndpoint());
            }
            for (ClusterMember member : this.passiveMembers) {
                publication.asyncAddDestination("aeron:udp?endpoint=" + member.logEndpoint());
            }
        }
        this.logPublisher.publication(publication);
        return publication.sessionId();
    }

    void becomeLeader(long leadershipTermId, long logPosition, int logSessionId, boolean isStartup) {
        boolean isIpc = this.ctx.logChannel().startsWith("aeron:ipc");
        String channel = (isIpc ? "aeron:ipc" : "aeron:udp") + "?tags=" + this.logPublicationChannelTag + "|session-id=" + logSessionId + "|alias=log";
        this.awaitServicesReady(isIpc ? channel : "aeron-spy:" + channel, this.ctx.logStreamId(), logSessionId, leadershipTermId, logPosition, Long.MAX_VALUE, isStartup, Cluster.Role.LEADER);
        this.startLogRecording(channel, this.ctx.logStreamId(), SourceLocation.LOCAL);
        this.createAppendPosition(logSessionId);
        this.leadershipTermId(leadershipTermId);
        this.prepareSessionsForNewTerm(isStartup);
    }

    void liveLogDestination(String liveLogDestination) {
        this.liveLogDestination = liveLogDestination;
    }

    String liveLogDestination() {
        return this.liveLogDestination;
    }

    void catchupLogDestination(String catchupLogDestination) {
        this.catchupLogDestination = catchupLogDestination;
    }

    void followLog(Image image, boolean isLeaderStartup) {
        Subscription logSubscription = image.subscription();
        int streamId = logSubscription.streamId();
        String channel = logSubscription.channel();
        this.awaitServicesReady(channel, streamId, image.sessionId(), this.leadershipTermId, image.joinPosition(), Long.MAX_VALUE, isLeaderStartup, Cluster.Role.FOLLOWER);
        this.startLogRecording(channel, streamId, SourceLocation.REMOTE);
        this.createAppendPosition(image.sessionId());
        this.logAdapter.image(image);
        this.lastAppendPosition = 0L;
        this.appendDynamicJoinTermAndSnapshots();
    }

    void awaitServicesReady(String logChannel, int streamId, int logSessionId, long leadershipTermId, long logPosition, long maxLogPosition, boolean isStartup, Cluster.Role role) {
        this.serviceProxy.joinLog(leadershipTermId, logPosition, maxLogPosition, this.memberId, logSessionId, streamId, isStartup, role, logChannel);
        this.expectedAckPosition = logPosition;
        while (!ServiceAck.hasReached(logPosition, this.serviceAckId, this.serviceAckQueues)) {
            this.idle(this.consensusModuleAdapter.poll());
        }
        ServiceAck.removeHead(this.serviceAckQueues);
        ++this.serviceAckId;
    }

    void leadershipTermId(long leadershipTermId) {
        this.leadershipTermId = leadershipTermId;
        this.replayLeadershipTermId = leadershipTermId;
    }

    LogReplay newLogReplay(long logPosition, long appendPosition) {
        return new LogReplay(this.archive, this.recoveryPlan.log.recordingId, logPosition, appendPosition, this.recoveryPlan.log.leadershipTermId, this.logAdapter, this.ctx);
    }

    int replayLogPoll(LogAdapter logAdapter, long stopPosition) {
        int workCount = 0;
        if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
            int fragments = logAdapter.poll(stopPosition);
            long position = logAdapter.position();
            if (fragments > 0) {
                this.commitPosition.setOrdered(position);
            } else if (logAdapter.isImageClosed() && position < stopPosition) {
                throw new ClusterException("unexpected image close when replaying log: position=" + position);
            }
            workCount += fragments;
        }
        return workCount += this.consensusModuleAdapter.poll();
    }

    long logRecordingId() {
        if (null != this.recoveryPlan.log) {
            return this.recoveryPlan.log.recordingId;
        }
        if (null == this.appendPosition) {
            return -1L;
        }
        return RecordingPos.getRecordingId(this.aeron.countersReader(), this.appendPosition.counterId());
    }

    void truncateLogEntry(long leadershipTermId, long logPosition) {
        long recordingId = this.logRecordingId();
        this.archive.stopAllReplays(recordingId);
        this.archive.truncateRecording(recordingId, logPosition);
        this.recordingLog.commitLogPosition(leadershipTermId, logPosition);
        this.logAdapter.disconnect(this.ctx.countedErrorHandler(), logPosition);
    }

    boolean electionComplete() {
        long termBaseLogPosition = this.election.logPosition();
        long now = this.clusterClock.time();
        long nowNs = this.clusterTimeUnit.toNanos(now);
        if (Cluster.Role.LEADER == this.role) {
            if (!this.logPublisher.isConnected() || !this.logPublisher.appendNewLeadershipTermEvent(this.leadershipTermId, now, termBaseLogPosition, this.memberId, this.logPublisher.sessionId(), this.clusterTimeUnit, this.ctx.appVersion())) {
                return false;
            }
            this.timeOfLastLogUpdateNs = nowNs - this.leaderHeartbeatIntervalNs;
            this.timerService.currentTickTime(now);
            ClusterControl.ToggleState.activate(this.controlToggle);
        } else {
            this.timeOfLastLogUpdateNs = nowNs;
            this.timeOfLastAppendPositionNs = nowNs;
        }
        if (!this.ctx.ingressChannel().contains("endpoint")) {
            ChannelUri ingressUri = ChannelUri.parse(this.ctx.ingressChannel());
            ingressUri.put("endpoint", this.thisMember.ingressEndpoint());
            this.ingressAdapter.connect(this.aeron.addSubscription(ingressUri.toString(), this.ctx.ingressStreamId(), null, this::onUnavailableIngressImage));
        } else if (Cluster.Role.LEADER == this.role) {
            this.ingressAdapter.connect(this.aeron.addSubscription(this.ctx.ingressChannel(), this.ctx.ingressStreamId(), null, this::onUnavailableIngressImage));
        }
        this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount());
        this.notifiedCommitPosition = termBaseLogPosition;
        this.commitPosition.setOrdered(termBaseLogPosition);
        this.pendingServiceMessages.consume(this.followerServiceSessionMessageSweeper, Integer.MAX_VALUE);
        this.updateMemberDetails(this.election.leader());
        this.election = null;
        return true;
    }

    boolean dynamicJoinComplete() {
        if (0 == this.activeMembers.length) {
            this.activeMembers = this.dynamicJoin.clusterMembers();
            ClusterMember.addClusterMemberIds(this.activeMembers, this.clusterMemberByIdMap);
            this.leaderMember = this.dynamicJoin.leader();
            ClusterMember.addConsensusPublications(this.activeMembers, this.thisMember, ChannelUri.parse(this.ctx.consensusChannel()), this.ctx.consensusStreamId(), this.aeron);
        }
        if (-1 == this.memberId) {
            this.memberId = this.dynamicJoin.memberId();
            this.ctx.clusterMarkFile().memberId(this.memberId);
            this.thisMember.id(this.memberId);
        }
        this.dynamicJoin = null;
        this.election = new Election(false, this.leadershipTermId, this.commitPosition.getWeak(), this.recoveryPlan.appendedLogPosition, this.activeMembers, this.clusterMemberByIdMap, this.thisMember, this.consensusPublisher, this.ctx, this);
        return true;
    }

    void trackCatchupCompletion(ClusterMember follower, long leadershipTermId) {
        if (-1L != follower.catchupReplaySessionId() && follower.logPosition() >= this.logPublisher.position()) {
            if (-1L != follower.catchupReplayCorrelationId() && this.archive.archiveProxy().stopReplay(follower.catchupReplaySessionId(), this.aeron.nextCorrelationId(), this.archive.controlSessionId())) {
                follower.catchupReplayCorrelationId(-1L);
            }
            if (this.consensusPublisher.stopCatchup(follower.publication(), leadershipTermId, follower.id())) {
                follower.catchupReplaySessionId(-1L);
            }
        }
    }

    void catchupInitiated(long nowNs) {
        this.timeOfLastAppendPositionNs = nowNs;
    }

    int catchupPoll(long limitPosition, long nowNs) {
        long appendPosition;
        int workCount = 0;
        if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
            int fragmentsPolled = this.logAdapter.poll(Math.min(this.appendPosition.get(), limitPosition));
            workCount += fragmentsPolled;
            Image image = this.logAdapter.image();
            if (fragmentsPolled == 0 && image.isClosed()) {
                throw new ClusterException("unexpected image close replaying log at position " + image.position());
            }
        }
        if ((appendPosition = this.logAdapter.position()) != this.lastAppendPosition || nowNs > this.timeOfLastAppendPositionNs + this.leaderHeartbeatIntervalNs) {
            this.commitPosition.proposeMaxOrdered(appendPosition);
            ExclusivePublication publication = this.election.leader().publication();
            if (this.consensusPublisher.appendPosition(publication, this.replayLeadershipTermId, appendPosition, this.memberId)) {
                this.lastAppendPosition = appendPosition;
                this.timeOfLastAppendPositionNs = nowNs;
            }
        }
        workCount += this.consensusModuleAdapter.poll();
        if (nowNs > this.timeOfLastAppendPositionNs + this.leaderHeartbeatTimeoutNs && ConsensusModule.State.ACTIVE == this.state) {
            throw new ClusterException("no catchup progress", AeronException.Category.WARN);
        }
        return workCount;
    }

    boolean isCatchupNearLive(long position) {
        Image image = this.logAdapter.image();
        if (null != image) {
            long window;
            long localPosition = image.position();
            return localPosition >= position - (window = (long)Math.min(image.termBufferLength() >> 2, 0x2000000));
        }
        return false;
    }

    void stopAllCatchups() {
        for (ClusterMember member : this.activeMembers) {
            if (member.catchupReplaySessionId() == -1L) continue;
            if (member.catchupReplayCorrelationId() != -1L) {
                try {
                    this.archive.stopReplay(member.catchupReplaySessionId());
                }
                catch (Exception ex) {
                    this.ctx.countedErrorHandler().onError(new ClusterException("catchup already stopped", ex, AeronException.Category.WARN));
                }
            }
            member.catchupReplaySessionId(-1L);
            member.catchupReplayCorrelationId(-1L);
        }
    }

    void retrievedSnapshot(long localRecordingId, RecordingLog.Snapshot leaderSnapshot) {
        this.dynamicJoinSnapshots.add(new RecordingLog.Snapshot(localRecordingId, leaderSnapshot.leadershipTermId, leaderSnapshot.termBaseLogPosition, leaderSnapshot.logPosition, leaderSnapshot.timestamp, leaderSnapshot.serviceId));
    }

    Counter loadSnapshotsForDynamicJoin() {
        this.recoveryPlan = RecordingLog.createRecoveryPlan(this.dynamicJoinSnapshots);
        Counter recoveryStateCounter = this.addRecoveryStateCounter(this.recoveryPlan);
        if (!this.recoveryPlan.snapshots.isEmpty()) {
            this.loadSnapshot(this.recoveryPlan.snapshots.get(0), this.archive);
        }
        return recoveryStateCounter;
    }

    boolean pollForSnapshotLoadAck(Counter recoveryStateCounter, long nowNs) {
        this.consensusModuleAdapter.poll();
        if (ServiceAck.hasReached(this.expectedAckPosition, this.serviceAckId, this.serviceAckQueues)) {
            this.captureServiceClientIds();
            ++this.serviceAckId;
            CloseHelper.close(this.ctx.countedErrorHandler(), recoveryStateCounter);
            this.state(ConsensusModule.State.ACTIVE);
            this.timeOfLastLogUpdateNs = nowNs;
            this.leadershipTermId(this.recoveryPlan.lastLeadershipTermId);
            return true;
        }
        return false;
    }

    private void startLogRecording(String channel, int streamId, SourceLocation sourceLocation) {
        long logRecordingId = this.recordingLog.findLastTermRecordingId();
        this.logSubscriptionId = -1L == logRecordingId ? this.archive.startRecording(channel, streamId, sourceLocation, true) : this.archive.extendRecording(logRecordingId, channel, streamId, sourceLocation, true);
    }

    private void prepareSessionsForNewTerm(boolean isStartup) {
        if (isStartup) {
            for (ClusterSession session : this.sessionByIdMap.values()) {
                if (session.state() != ClusterSession.State.OPEN) continue;
                session.closing(CloseReason.TIMEOUT);
            }
        } else {
            for (ClusterSession session : this.sessionByIdMap.values()) {
                if (session.state() != ClusterSession.State.OPEN) continue;
                session.connect(this.aeron);
            }
            long nowNs = this.clusterClock.timeNanos();
            for (ClusterSession session : this.sessionByIdMap.values()) {
                if (session.state() != ClusterSession.State.OPEN) continue;
                session.timeOfLastActivityNs(nowNs);
                session.hasNewLeaderEventPending(true);
            }
        }
    }

    private void updateMemberDetails(ClusterMember newLeader) {
        this.leaderMember = newLeader;
        this.sessionProxy.leaderMemberId(this.leaderMember.id()).leadershipTermId(this.leadershipTermId);
        for (ClusterMember clusterMember : this.activeMembers) {
            clusterMember.isLeader(clusterMember.id() == this.leaderMember.id());
        }
        this.ingressEndpoints = ClusterMember.ingressEndpoints(this.activeMembers);
    }

    private int slowTickWork(long nowMs, long nowNs) {
        int workCount = this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException("unexpected Aeron close");
        }
        if (null == this.dynamicJoin) {
            this.checkForArchiveError(true);
        }
        if (nowNs >= this.timeOfLastMarkFileUpdateNs + ClusteredServiceContainer.Configuration.MARK_FILE_UPDATE_INTERVAL_NS) {
            this.markFile.updateActivityTimestamp(nowMs);
            this.timeOfLastMarkFileUpdateNs = nowMs;
        }
        workCount += this.sendRedirects(this.redirectSessions, nowNs);
        workCount += this.sendRejections(this.rejectedSessions, nowNs);
        if (null == this.election) {
            if (Cluster.Role.LEADER == this.role) {
                workCount += this.checkControlToggle(nowNs);
                if (ConsensusModule.State.ACTIVE == this.state) {
                    workCount += this.processPendingSessions(this.pendingSessions, nowMs, nowNs);
                    workCount += this.checkSessions(this.sessionByIdMap, nowNs);
                    workCount += this.processPassiveMembers(this.passiveMembers);
                    if (!ClusterMember.hasActiveQuorum(this.activeMembers, nowNs, this.leaderHeartbeatTimeoutNs)) {
                        this.ctx.countedErrorHandler().onError(new ClusterException("inactive follower quorum", AeronException.Category.WARN));
                        this.enterElection(nowNs);
                        ++workCount;
                    }
                } else if (ConsensusModule.State.TERMINATING == this.state && this.clusterTermination.canTerminate(this.activeMembers, this.terminationPosition, nowNs)) {
                    this.recordingLog.commitLogPosition(this.leadershipTermId, this.terminationPosition);
                    this.closeAndTerminate();
                }
            } else if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
                if (-1L != this.terminationPosition && this.logAdapter.position() >= this.terminationPosition) {
                    this.serviceProxy.terminationPosition(this.terminationPosition, this.ctx.countedErrorHandler());
                    this.state(ConsensusModule.State.TERMINATING);
                }
                if (nowNs >= this.timeOfLastLogUpdateNs + this.leaderHeartbeatTimeoutNs) {
                    this.ctx.countedErrorHandler().onError(new ClusterException("leader heartbeat timeout", AeronException.Category.WARN));
                    this.enterElection(nowNs);
                    ++workCount;
                }
            }
        }
        return workCount;
    }

    private void checkForArchiveError(boolean isSlowTick) {
        if (null != this.archive) {
            ControlResponsePoller poller = this.archive.controlResponsePoller();
            if (!poller.subscription().isConnected()) {
                this.serviceProxy.terminationPosition(-1L, this.ctx.countedErrorHandler());
                throw new AgentTerminationException("local archive not connected");
            }
            if (poller.poll() != 0 && poller.isPollComplete() && poller.controlSessionId() == this.archive.controlSessionId() && poller.code() == ControlResponseCode.ERROR) {
                for (ClusterMember member : this.activeMembers) {
                    if (member.catchupReplayCorrelationId() == -1L || member.catchupReplayCorrelationId() != poller.correlationId()) continue;
                    member.catchupReplaySessionId(-1L);
                    member.catchupReplayCorrelationId(-1L);
                    this.ctx.countedErrorHandler().onError(new ClusterException("catchup replay failed - " + poller.errorMessage(), AeronException.Category.WARN));
                    return;
                }
                ArchiveException ex = new ArchiveException(poller.errorMessage(), (int)poller.relevantId(), poller.correlationId());
                if (null != this.election && isSlowTick) {
                    this.election.handleError(this.clusterClock.timeNanos(), ex);
                } else {
                    throw ex;
                }
            }
        }
    }

    private int consensusWork(long timestamp, long nowNs) {
        int workCount = 0;
        if (Cluster.Role.LEADER == this.role) {
            if (ConsensusModule.State.ACTIVE == this.state) {
                workCount += this.timerService.poll(timestamp);
                workCount += this.pendingServiceMessages.forEach(this.pendingServiceMessageHeadOffset, this.serviceSessionMessageAppender, 20);
                workCount += this.ingressAdapter.poll();
            }
            workCount += this.updateLeaderPosition(nowNs);
        } else {
            if (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state) {
                int count = this.logAdapter.poll(Math.min(this.notifiedCommitPosition, this.appendPosition.get()));
                if (0 == count && this.logAdapter.isImageClosed()) {
                    this.ctx.countedErrorHandler().onError(new ClusterException("log disconnected from leader: logPosition=" + this.logAdapter.position() + " commitPosition=" + this.commitPosition.getWeak() + " leadershipTermId=" + this.leadershipTermId + " leaderId=" + this.leaderMember.id(), AeronException.Category.WARN));
                    this.enterElection(nowNs);
                    return 1;
                }
                this.commitPosition.proposeMaxOrdered(this.logAdapter.position());
                workCount += this.ingressAdapter.poll();
                workCount += count;
            }
            workCount += this.updateFollowerPosition(nowNs);
        }
        return workCount += this.consensusModuleAdapter.poll();
    }

    private int checkControlToggle(long nowNs) {
        switch (ClusterControl.ToggleState.get(this.controlToggle)) {
            case SUSPEND: {
                if (ConsensusModule.State.ACTIVE != this.state || !this.appendAction(ClusterAction.SUSPEND)) break;
                this.state(ConsensusModule.State.SUSPENDED);
                break;
            }
            case RESUME: {
                if (ConsensusModule.State.SUSPENDED != this.state || !this.appendAction(ClusterAction.RESUME)) break;
                this.state(ConsensusModule.State.ACTIVE);
                ClusterControl.ToggleState.reset(this.controlToggle);
                break;
            }
            case SNAPSHOT: {
                if (ConsensusModule.State.ACTIVE != this.state || !this.appendAction(ClusterAction.SNAPSHOT)) break;
                this.state(ConsensusModule.State.SNAPSHOT);
                break;
            }
            case SHUTDOWN: {
                if (ConsensusModule.State.ACTIVE != this.state || !this.appendAction(ClusterAction.SNAPSHOT)) break;
                long position = this.logPublisher.position();
                this.clusterTermination = new ClusterTermination(nowNs + this.ctx.terminationTimeoutNs());
                this.clusterTermination.terminationPosition(this.consensusPublisher, this.activeMembers, this.thisMember, this.leadershipTermId, position);
                this.terminationPosition = position;
                this.state(ConsensusModule.State.SNAPSHOT);
                break;
            }
            case ABORT: {
                if (ConsensusModule.State.ACTIVE != this.state) break;
                long position = this.logPublisher.position();
                this.clusterTermination = new ClusterTermination(nowNs + this.ctx.terminationTimeoutNs());
                this.clusterTermination.terminationPosition(this.consensusPublisher, this.activeMembers, this.thisMember, this.leadershipTermId, position);
                this.terminationPosition = position;
                this.serviceProxy.terminationPosition(this.terminationPosition, this.ctx.countedErrorHandler());
                this.state(ConsensusModule.State.TERMINATING);
                break;
            }
            default: {
                return 0;
            }
        }
        return 1;
    }

    private boolean appendAction(ClusterAction action) {
        return this.logPublisher.appendClusterAction(this.leadershipTermId, this.clusterClock.time(), action);
    }

    private int processPendingSessions(ArrayList<ClusterSession> pendingSessions, long nowMs, long nowNs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = pendingSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = pendingSessions.get(i);
            if ((session.state() == ClusterSession.State.INIT || session.state() == ClusterSession.State.CONNECTED) && session.isResponsePublicationConnected()) {
                session.state(ClusterSession.State.CONNECTED);
                this.authenticator.onConnectedSession(this.sessionProxy.session(session), nowMs);
            }
            if (session.state() == ClusterSession.State.CHALLENGED && session.isResponsePublicationConnected()) {
                this.authenticator.onChallengedSession(this.sessionProxy.session(session), nowMs);
            }
            if (session.state() == ClusterSession.State.AUTHENTICATED) {
                if (session.isBackupSession()) {
                    if (session.responsePublication().isConnected()) {
                        RecordingLog.Entry lastEntry = this.recordingLog.findLastTerm();
                        if (this.consensusPublisher.backupResponse(session.responsePublication(), session.correlationId(), this.recoveryPlan.log.recordingId, this.recoveryPlan.log.leadershipTermId, this.recoveryPlan.log.termBaseLogPosition, lastEntry.leadershipTermId, lastEntry.termBaseLogPosition, this.commitPosition.id(), this.leaderMember.id(), this.recoveryPlan, ClusterMember.encodeAsString(this.activeMembers))) {
                            ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                            session.close(this.ctx.countedErrorHandler());
                        }
                    }
                } else {
                    ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                    session.timeOfLastActivityNs(nowNs);
                    this.sessionByIdMap.put(session.id(), session);
                    this.appendSessionOpen(session);
                }
                ++workCount;
                continue;
            }
            if (session.state() == ClusterSession.State.REJECTED) {
                ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                this.rejectedSessions.add(session);
                continue;
            }
            if (nowNs <= session.timeOfLastActivityNs() + this.sessionTimeoutNs) continue;
            ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
            session.close(this.ctx.countedErrorHandler());
            this.ctx.timedOutClientCounter().incrementOrdered();
        }
        return workCount;
    }

    private int sendRejections(ArrayList<ClusterSession> rejectedSessions, long nowNs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = rejectedSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = rejectedSessions.get(i);
            String detail = session.responseDetail();
            EventCode eventCode = session.eventCode();
            if (!this.egressPublisher.sendEvent(session, this.leadershipTermId, this.leaderMember.id(), eventCode, detail) && nowNs <= session.timeOfLastActivityNs() + this.sessionTimeoutNs) continue;
            ArrayListUtil.fastUnorderedRemove(rejectedSessions, i, lastIndex--);
            session.close(this.ctx.countedErrorHandler());
            ++workCount;
        }
        return workCount;
    }

    private int sendRedirects(ArrayList<ClusterSession> redirectSessions, long nowNs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = redirectSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = redirectSessions.get(i);
            EventCode eventCode = EventCode.REDIRECT;
            int leaderId = this.leaderMember.id();
            if (!this.egressPublisher.sendEvent(session, this.leadershipTermId, leaderId, eventCode, this.ingressEndpoints) && nowNs <= session.timeOfLastActivityNs() + this.sessionTimeoutNs) continue;
            ArrayListUtil.fastUnorderedRemove(redirectSessions, i, lastIndex--);
            session.close(this.ctx.countedErrorHandler());
            ++workCount;
        }
        return workCount;
    }

    private int processPassiveMembers(ClusterMember[] passiveMembers) {
        int workCount = 0;
        for (ClusterMember member : passiveMembers) {
            if (member.correlationId() != -1L) {
                if (!this.consensusPublisher.clusterMemberChange(member.publication(), member.correlationId(), this.leaderMember.id(), ClusterMember.encodeAsString(this.activeMembers), ClusterMember.encodeAsString(passiveMembers))) continue;
                member.correlationId(-1L);
                ++workCount;
                continue;
            }
            if (!member.hasRequestedJoin() || member.logPosition() != this.logPublisher.position()) continue;
            ClusterMember[] newMembers = ClusterMember.addMember(this.activeMembers, member);
            long now = this.clusterClock.time();
            if (this.logPublisher.appendMembershipChangeEvent(this.leadershipTermId, now, this.leaderMember.id(), newMembers.length, ChangeType.JOIN, member.id(), ClusterMember.encodeAsString(newMembers)) <= 0L) continue;
            this.timeOfLastLogUpdateNs = this.clusterTimeUnit.toNanos(now) - this.leaderHeartbeatIntervalNs;
            this.passiveMembers = ClusterMember.removeMember(this.passiveMembers, member.id());
            this.activeMembers = newMembers;
            this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
            member.hasRequestedJoin(false);
            ++workCount;
            break;
        }
        return workCount;
    }

    private int checkSessions(Long2ObjectHashMap<ClusterSession> sessionByIdMap, long nowNs) {
        int workCount = 0;
        Long2ObjectHashMap.ValueIterator i = sessionByIdMap.values().iterator();
        while (i.hasNext()) {
            ClusterSession session = (ClusterSession)i.next();
            if (nowNs > session.timeOfLastActivityNs() + this.sessionTimeoutNs) {
                String msg;
                if (session.state() == ClusterSession.State.OPEN) {
                    session.closing(CloseReason.TIMEOUT);
                    if (this.logPublisher.appendSessionClose(session, this.leadershipTermId, this.clusterClock.time())) {
                        msg = session.closeReason().name();
                        this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, msg);
                        session.closedLogPosition(this.logPublisher.position());
                        this.uncommittedClosedSessions.addLast(session);
                        i.remove();
                        session.close(this.ctx.countedErrorHandler());
                        this.ctx.timedOutClientCounter().incrementOrdered();
                    }
                } else if (session.state() == ClusterSession.State.CLOSING) {
                    if (this.logPublisher.appendSessionClose(session, this.leadershipTermId, this.clusterClock.time())) {
                        msg = session.closeReason().name();
                        this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, msg);
                        session.closedLogPosition(this.logPublisher.position());
                        this.uncommittedClosedSessions.addLast(session);
                        i.remove();
                        session.close(this.ctx.errorHandler());
                        if (session.closeReason() == CloseReason.TIMEOUT) {
                            this.ctx.timedOutClientCounter().incrementOrdered();
                        }
                    }
                } else {
                    i.remove();
                    session.close(this.ctx.countedErrorHandler());
                }
                ++workCount;
                continue;
            }
            if (session.state() == ClusterSession.State.CONNECTED) {
                this.appendSessionOpen(session);
                ++workCount;
                continue;
            }
            if (!session.hasNewLeaderEventPending()) continue;
            this.sendNewLeaderEvent(session);
            ++workCount;
        }
        return workCount;
    }

    private void captureServiceAck(long logPosition, long ackId, long relevantId, int serviceId) {
        if (0L == ackId && -1L != this.serviceClientIds[serviceId]) {
            throw new ClusterException("initial ack already received from service: possible duplicate serviceId=" + serviceId);
        }
        this.serviceAckQueues[serviceId].offerLast(new ServiceAck(ackId, logPosition, relevantId));
    }

    private ServiceAck[] pollServiceAcks(long logPosition, int serviceId) {
        ServiceAck[] serviceAcks = new ServiceAck[this.serviceAckQueues.length];
        int length = this.serviceAckQueues.length;
        for (int id = 0; id < length; ++id) {
            ServiceAck serviceAck = this.serviceAckQueues[id].pollFirst();
            if (null == serviceAck || serviceAck.logPosition() != logPosition) {
                throw new ClusterException("invalid ack for serviceId=" + serviceId + " logPosition=" + logPosition + " " + serviceAck);
            }
            serviceAcks[id] = serviceAck;
        }
        return serviceAcks;
    }

    private void sendNewLeaderEvent(ClusterSession session) {
        if (this.egressPublisher.newLeader(session, this.leadershipTermId, this.leaderMember.id(), this.ingressEndpoints)) {
            session.hasNewLeaderEventPending(false);
        }
    }

    private void appendSessionOpen(ClusterSession session) {
        long resultingPosition = this.logPublisher.appendSessionOpen(session, this.leadershipTermId, this.clusterClock.time());
        if (resultingPosition > 0L) {
            session.open(resultingPosition);
        }
    }

    private void createAppendPosition(int logSessionId) {
        CountersReader counters = this.aeron.countersReader();
        int recordingCounterId = this.awaitRecordingCounter(counters, logSessionId);
        this.appendPosition = new ReadableCounter(counters, recordingCounterId);
    }

    private void loadSnapshot(RecordingLog.Snapshot snapshot, AeronArchive archive) {
        String channel = this.ctx.replayChannel();
        int streamId = this.ctx.replayStreamId();
        int sessionId = (int)archive.startReplay(snapshot.recordingId, 0L, -1L, channel, streamId);
        String replaySubscriptionChannel = ChannelUri.addSessionId(channel, sessionId);
        try (Subscription subscription = this.aeron.addSubscription(replaySubscriptionChannel, streamId);){
            Image image = this.awaitImage(sessionId, subscription);
            ConsensusModuleSnapshotLoader snapshotLoader = new ConsensusModuleSnapshotLoader(image, this);
            while (true) {
                int fragments;
                if ((fragments = snapshotLoader.poll()) == 0) {
                    if (snapshotLoader.isDone()) break;
                    if (image.isClosed()) {
                        throw new ClusterException("snapshot ended unexpectedly");
                    }
                }
                this.idle(fragments);
            }
            int appVersion = snapshotLoader.appVersion();
            if (SemanticVersion.major(this.ctx.appVersion()) != SemanticVersion.major(appVersion)) {
                throw new ClusterException("incompatible version: " + SemanticVersion.toString(this.ctx.appVersion()) + " snapshot=" + SemanticVersion.toString(appVersion));
            }
            TimeUnit timeUnit = snapshotLoader.timeUnit();
            if (timeUnit != this.clusterTimeUnit) {
                throw new ClusterException("incompatible time unit: " + (Object)((Object)this.clusterTimeUnit) + " snapshot=" + (Object)((Object)timeUnit));
            }
            this.pendingServiceMessages.forEach(this::serviceSessionMessageReset, Integer.MAX_VALUE);
        }
        this.timerService.currentTickTime(this.clusterClock.time());
        this.leadershipTermId(snapshot.leadershipTermId);
        this.commitPosition.setOrdered(snapshot.logPosition);
        this.expectedAckPosition = snapshot.logPosition;
    }

    private Image awaitImage(int sessionId, Subscription subscription) {
        Image image;
        this.idleStrategy.reset();
        while ((image = subscription.imageBySessionId(sessionId)) == null) {
            this.idle();
        }
        return image;
    }

    private Counter addRecoveryStateCounter(RecordingLog.RecoveryPlan plan) {
        int snapshotsCount = plan.snapshots.size();
        if (snapshotsCount > 0) {
            long[] serviceSnapshotRecordingIds = new long[snapshotsCount - 1];
            RecordingLog.Snapshot snapshot = plan.snapshots.get(0);
            for (int i = 1; i < snapshotsCount; ++i) {
                RecordingLog.Snapshot serviceSnapshot = plan.snapshots.get(i);
                serviceSnapshotRecordingIds[serviceSnapshot.serviceId] = serviceSnapshot.recordingId;
            }
            return RecoveryState.allocate(this.aeron, snapshot.leadershipTermId, snapshot.logPosition, snapshot.timestamp, this.ctx.clusterId(), serviceSnapshotRecordingIds);
        }
        return RecoveryState.allocate(this.aeron, this.leadershipTermId, 0L, 0L, this.ctx.clusterId(), new long[0]);
    }

    private DynamicJoin requiresDynamicJoin() {
        if (0 == this.activeMembers.length && null != this.ctx.clusterConsensusEndpoints()) {
            return new DynamicJoin(this.ctx.clusterConsensusEndpoints(), this.archive, this.consensusPublisher, this.ctx, this);
        }
        return null;
    }

    private void captureServiceClientIds() {
        int length = this.serviceClientIds.length;
        for (int i = 0; i < length; ++i) {
            ServiceAck serviceAck = this.serviceAckQueues[i].pollFirst();
            this.serviceClientIds[i] = Objects.requireNonNull(serviceAck).relevantId();
        }
    }

    private void handleMemberRemovals(long commitPosition) {
        ClusterMember[] members = this.activeMembers;
        for (ClusterMember member : this.activeMembers) {
            if (!member.hasRequestedRemove() || member.removalPosition() > commitPosition) continue;
            if (member.id() == this.memberId) {
                this.state(ConsensusModule.State.QUITTING);
            }
            members = ClusterMember.removeMember(members, member.id());
            this.clusterMemberByIdMap.remove(member.id());
            this.clusterMemberByIdMap.compact();
            member.closePublication(this.ctx.countedErrorHandler());
            this.logPublisher.removeDestination(this.ctx.isLogMdc(), member.logEndpoint());
            --this.pendingMemberRemovals;
        }
        this.activeMembers = members;
        this.rankedPositions = new long[ClusterMember.quorumThreshold(members.length)];
    }

    private int updateLeaderPosition(long nowNs) {
        long appendPosition = this.appendPosition.get();
        this.thisMember.logPosition(appendPosition).timeOfLastAppendPositionNs(nowNs);
        long commitPosition = Math.min(ClusterMember.quorumPosition(this.activeMembers, this.rankedPositions), appendPosition);
        if (commitPosition > this.commitPosition.getWeak() || nowNs >= this.timeOfLastLogUpdateNs + this.leaderHeartbeatIntervalNs) {
            for (ClusterMember member : this.activeMembers) {
                if (member.id() == this.memberId) continue;
                ExclusivePublication publication = member.publication();
                this.consensusPublisher.commitPosition(publication, this.leadershipTermId, commitPosition, this.memberId);
            }
            this.commitPosition.setOrdered(commitPosition);
            this.timeOfLastLogUpdateNs = nowNs;
            this.clearUncommittedEntriesTo(commitPosition);
            if (this.pendingMemberRemovals > 0) {
                this.handleMemberRemovals(commitPosition);
            }
            return 1;
        }
        return 0;
    }

    private int updateFollowerPosition(long nowNs) {
        ExclusivePublication publication = this.leaderMember.publication();
        long appendPosition = this.appendPosition.get();
        if ((appendPosition != this.lastAppendPosition || nowNs >= this.timeOfLastAppendPositionNs + this.leaderHeartbeatIntervalNs) && this.consensusPublisher.appendPosition(publication, this.leadershipTermId, appendPosition, this.memberId)) {
            this.lastAppendPosition = appendPosition;
            this.timeOfLastAppendPositionNs = nowNs;
            return 1;
        }
        return 0;
    }

    private void clearSessionsAfter(long logPosition) {
        Long2ObjectHashMap.ValueIterator i = this.sessionByIdMap.values().iterator();
        while (i.hasNext()) {
            ClusterSession session = (ClusterSession)i.next();
            if (session.openedLogPosition() <= logPosition) continue;
            i.remove();
            this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, "election");
            session.close(this.ctx.countedErrorHandler());
        }
        for (ClusterSession session : this.pendingSessions) {
            this.egressPublisher.sendEvent(session, this.leadershipTermId, this.memberId, EventCode.CLOSED, "election");
            session.close(this.ctx.countedErrorHandler());
        }
        this.pendingSessions.clear();
    }

    private void clearUncommittedEntriesTo(long commitPosition) {
        ClusterSession clusterSession;
        if (this.uncommittedServiceMessages > 0) {
            this.pendingServiceMessageHeadOffset -= this.pendingServiceMessages.consume(this.leaderServiceSessionMessageSweeper, Integer.MAX_VALUE);
        }
        while (this.uncommittedTimers.peekLong() <= commitPosition) {
            this.uncommittedTimers.pollLong();
            this.uncommittedTimers.pollLong();
        }
        while (null != (clusterSession = this.uncommittedClosedSessions.peekFirst()) && clusterSession.closedLogPosition() <= commitPosition) {
            this.uncommittedClosedSessions.pollFirst();
        }
    }

    private void restoreUncommittedEntries(long commitPosition) {
        ClusterSession session;
        LongArrayQueue.LongIterator i = this.uncommittedTimers.iterator();
        while (i.hasNext()) {
            long appendPosition = i.nextValue();
            long correlationId = i.nextValue();
            if (appendPosition <= commitPosition) continue;
            this.timerService.scheduleTimerForCorrelationId(correlationId, this.timerService.currentTickTime());
        }
        this.uncommittedTimers.clear();
        this.pendingServiceMessages.consume(this.followerServiceSessionMessageSweeper, Integer.MAX_VALUE);
        this.pendingServiceMessageHeadOffset = 0;
        if (this.uncommittedServiceMessages > 0) {
            this.pendingServiceMessages.consume(this.leaderServiceSessionMessageSweeper, Integer.MAX_VALUE);
            this.pendingServiceMessages.forEach(this::serviceSessionMessageReset, Integer.MAX_VALUE);
            this.uncommittedServiceMessages = 0;
        }
        while (null != (session = this.uncommittedClosedSessions.pollFirst())) {
            if (session.closedLogPosition() <= commitPosition) continue;
            session.closedLogPosition(-1L);
            session.state(ClusterSession.State.CLOSING);
            this.sessionByIdMap.put(session.id(), session);
        }
    }

    private void enterElection(long nowNs) {
        ClusterControl.ToggleState.deactivate(this.controlToggle);
        CloseHelper.close(this.ctx.countedErrorHandler(), this.ingressAdapter);
        this.role(Cluster.Role.FOLLOWER);
        this.election = new Election(false, this.leadershipTermId, this.commitPosition.getWeak(), null != this.appendPosition ? this.appendPosition.get() : this.recoveryPlan.appendedLogPosition, this.activeMembers, this.clusterMemberByIdMap, this.thisMember, this.consensusPublisher, this.ctx, this);
        this.election.doWork(nowNs);
    }

    private void idle() {
        ConsensusModuleAgent.checkInterruptStatus();
        this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException();
        }
        this.idleStrategy.idle();
        this.checkForArchiveError(false);
    }

    private void idle(int workCount) {
        ConsensusModuleAgent.checkInterruptStatus();
        this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException();
        }
        this.idleStrategy.idle(workCount);
        if (0 == workCount) {
            this.checkForArchiveError(false);
        }
    }

    private static void checkInterruptStatus() {
        if (Thread.interrupted()) {
            LangUtil.rethrowUnchecked(new InterruptedException());
        }
    }

    private void takeSnapshot(long timestamp, long logPosition, ServiceAck[] serviceAcks) {
        try {
            long recordingId;
            try (ExclusivePublication publication = this.aeron.addExclusivePublication(this.ctx.snapshotChannel(), this.ctx.snapshotStreamId());){
                String channel = ChannelUri.addSessionId(this.ctx.snapshotChannel(), publication.sessionId());
                this.archive.startRecording(channel, this.ctx.snapshotStreamId(), SourceLocation.LOCAL, true);
                CountersReader counters = this.aeron.countersReader();
                int counterId = this.awaitRecordingCounter(counters, publication.sessionId());
                recordingId = RecordingPos.getRecordingId(counters, counterId);
                this.snapshotState(publication, logPosition, this.replayLeadershipTermId);
                this.awaitRecordingComplete(recordingId, publication.position(), counters, counterId);
            }
            long termBaseLogPosition = this.recordingLog.getTermEntry((long)this.replayLeadershipTermId).termBaseLogPosition;
            for (int serviceId = serviceAcks.length - 1; serviceId >= 0; --serviceId) {
                long snapshotId = serviceAcks[serviceId].relevantId();
                this.recordingLog.appendSnapshot(snapshotId, this.replayLeadershipTermId, termBaseLogPosition, logPosition, timestamp, serviceId);
            }
            this.recordingLog.appendSnapshot(recordingId, this.replayLeadershipTermId, termBaseLogPosition, logPosition, timestamp, -1);
            this.recordingLog.force(this.ctx.fileSyncLevel());
            this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount());
            this.ctx.snapshotCounter().incrementOrdered();
        }
        catch (Exception ex) {
            this.ctx.countedErrorHandler().onError(ex);
        }
    }

    private void awaitRecordingComplete(long recordingId, long position, CountersReader counters, int counterId) {
        this.idleStrategy.reset();
        while (counters.getCounterValue(counterId) < position) {
            this.idle();
            if (RecordingPos.isActive(counters, counterId, recordingId)) continue;
            throw new ClusterException("recording has stopped unexpectedly: " + recordingId);
        }
    }

    private int awaitRecordingCounter(CountersReader counters, int sessionId) {
        this.idleStrategy.reset();
        int counterId = RecordingPos.findCounterIdBySession(counters, sessionId);
        while (-1 == counterId) {
            this.idle();
            counterId = RecordingPos.findCounterIdBySession(counters, sessionId);
        }
        return counterId;
    }

    private void snapshotState(ExclusivePublication publication, long logPosition, long leadershipTermId) {
        ConsensusModuleSnapshotTaker snapshotTaker = new ConsensusModuleSnapshotTaker(publication, this.idleStrategy, this.aeronClientInvoker);
        snapshotTaker.markBegin(1L, logPosition, leadershipTermId, 0, this.clusterTimeUnit, this.ctx.appVersion());
        snapshotTaker.snapshotConsensusModuleState(this.nextSessionId, this.nextServiceSessionId, this.logServiceSessionId, this.pendingServiceMessages.size());
        snapshotTaker.snapshotClusterMembers(this.memberId, this.highMemberId, this.activeMembers);
        for (ClusterSession session : this.sessionByIdMap.values()) {
            if (session.state() != ClusterSession.State.OPEN && session.state() != ClusterSession.State.CLOSED) continue;
            snapshotTaker.snapshotSession(session);
        }
        this.timerService.snapshot(snapshotTaker);
        snapshotTaker.snapshot(this.pendingServiceMessages);
        snapshotTaker.markEnd(1L, logPosition, leadershipTermId, 0, this.clusterTimeUnit, this.ctx.appVersion());
    }

    private void clusterMemberJoined(int memberId, ClusterMember[] newMembers) {
        this.highMemberId = Math.max(this.highMemberId, memberId);
        ClusterMember eventMember = ClusterMember.findMember(newMembers, memberId);
        if (null != eventMember) {
            if (null == eventMember.publication()) {
                ClusterMember.addConsensusPublication(eventMember, ChannelUri.parse(this.ctx.consensusChannel()), this.ctx.consensusStreamId(), this.aeron);
            }
            this.activeMembers = ClusterMember.addMember(this.activeMembers, eventMember);
            this.clusterMemberByIdMap.put(memberId, eventMember);
            this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
        }
    }

    private void clusterMemberQuit(int memberId) {
        this.activeMembers = ClusterMember.removeMember(this.activeMembers, memberId);
        this.clusterMemberByIdMap.remove(memberId);
        this.rankedPositions = new long[ClusterMember.quorumThreshold(this.activeMembers.length)];
    }

    private void onUnavailableIngressImage(Image image) {
        this.ingressAdapter.freeSessionBuffer(image.sessionId());
    }

    private void enqueueServiceSessionMessage(MutableDirectBuffer buffer, int offset, int length, long clusterSessionId) {
        int headerOffset = offset - 24;
        int clusterSessionIdOffset = headerOffset + SessionMessageHeaderDecoder.clusterSessionIdEncodingOffset();
        int timestampOffset = headerOffset + SessionMessageHeaderDecoder.timestampEncodingOffset();
        buffer.putLong(clusterSessionIdOffset, clusterSessionId, SessionMessageHeaderDecoder.BYTE_ORDER);
        buffer.putLong(timestampOffset, Long.MAX_VALUE, SessionMessageHeaderDecoder.BYTE_ORDER);
        if (!this.pendingServiceMessages.append(buffer, offset - 32, length + 32)) {
            throw new ClusterException("pending service message buffer capacity: " + this.pendingServiceMessages.size());
        }
    }

    private boolean serviceSessionMessageAppender(MutableDirectBuffer buffer, int offset, int length, int headOffset) {
        int headerOffset = offset + 8;
        int clusterSessionIdOffset = headerOffset + SessionMessageHeaderDecoder.clusterSessionIdEncodingOffset();
        int timestampOffset = headerOffset + SessionMessageHeaderDecoder.timestampEncodingOffset();
        long clusterSessionId = buffer.getLong(clusterSessionIdOffset, SessionMessageHeaderDecoder.BYTE_ORDER);
        long appendPosition = this.logPublisher.appendMessage(this.leadershipTermId, clusterSessionId, this.clusterClock.time(), buffer, offset + 32, length - 32);
        if (appendPosition > 0L) {
            ++this.uncommittedServiceMessages;
            this.logServiceSessionId = clusterSessionId;
            this.pendingServiceMessageHeadOffset = headOffset;
            buffer.putLong(timestampOffset, appendPosition, SessionMessageHeaderEncoder.BYTE_ORDER);
            return true;
        }
        return false;
    }

    private boolean serviceSessionMessageReset(MutableDirectBuffer buffer, int offset, int length, int headOffset) {
        int timestampOffset = offset + 8 + SessionMessageHeaderDecoder.timestampEncodingOffset();
        long appendPosition = buffer.getLong(timestampOffset, SessionMessageHeaderDecoder.BYTE_ORDER);
        if (appendPosition < Long.MAX_VALUE) {
            buffer.putLong(timestampOffset, Long.MAX_VALUE, SessionMessageHeaderEncoder.BYTE_ORDER);
            return true;
        }
        return false;
    }

    private boolean leaderServiceSessionMessageSweeper(MutableDirectBuffer buffer, int offset, int length, int headOffset) {
        int timestampOffset = offset + 8 + SessionMessageHeaderDecoder.timestampEncodingOffset();
        long appendPosition = buffer.getLong(timestampOffset, SessionMessageHeaderDecoder.BYTE_ORDER);
        if (appendPosition <= this.commitPosition.getWeak()) {
            --this.uncommittedServiceMessages;
            return true;
        }
        return false;
    }

    private boolean followerServiceSessionMessageSweeper(MutableDirectBuffer buffer, int offset, int length, int headOffset) {
        int clusterSessionIdOffset = offset + 8 + SessionMessageHeaderDecoder.clusterSessionIdEncodingOffset();
        return buffer.getLong(clusterSessionIdOffset, SessionMessageHeaderDecoder.BYTE_ORDER) <= this.logServiceSessionId;
    }

    private void onUnavailableCounter(CountersReader counters, long registrationId, int counterId) {
        for (long clientId : this.serviceClientIds) {
            if (registrationId != clientId || ConsensusModule.State.TERMINATING == this.state || ConsensusModule.State.QUITTING == this.state) continue;
            this.ctx.errorHandler().onError(new ClusterException("Aeron client for service closed unexpectedly", AeronException.Category.WARN));
            this.closeAndTerminate();
        }
    }

    private void closeAndTerminate() {
        this.state(ConsensusModule.State.CLOSED);
        this.tryStopLogRecording();
        this.ctx.terminationHook().run();
        throw new AgentTerminationException();
    }

    private void tryStopLogRecording() {
        if (-1L != this.logSubscriptionId && this.archive.archiveProxy().publication().isConnected()) {
            try {
                this.archive.tryStopRecording(this.logSubscriptionId);
            }
            catch (Exception ex) {
                this.ctx.countedErrorHandler().onError(new ClusterException(ex, AeronException.Category.WARN));
            }
            this.logSubscriptionId = -1L;
        }
    }

    private void appendDynamicJoinTermAndSnapshots() {
        if (!this.dynamicJoinSnapshots.isEmpty()) {
            long logRecordingId = this.logRecordingId();
            RecordingLog.Snapshot lastSnapshot = this.dynamicJoinSnapshots.get(this.dynamicJoinSnapshots.size() - 1);
            this.recordingLog.appendTerm(logRecordingId, lastSnapshot.leadershipTermId, lastSnapshot.termBaseLogPosition, lastSnapshot.timestamp);
            for (int i = this.dynamicJoinSnapshots.size() - 1; i >= 0; --i) {
                RecordingLog.Snapshot snapshot = this.dynamicJoinSnapshots.get(i);
                this.recordingLog.appendSnapshot(snapshot.recordingId, snapshot.leadershipTermId, snapshot.termBaseLogPosition, snapshot.logPosition, snapshot.timestamp, snapshot.serviceId);
            }
            this.dynamicJoinSnapshots.clear();
        }
    }
}

