/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster.service;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.DirectBufferVector;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.ChangeType;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.ClusterAction;
import io.aeron.cluster.codecs.MessageHeaderEncoder;
import io.aeron.cluster.codecs.SessionMessageHeaderEncoder;
import io.aeron.cluster.service.ActiveLogEvent;
import io.aeron.cluster.service.BoundedLogAdapter;
import io.aeron.cluster.service.ClientSession;
import io.aeron.cluster.service.Cluster;
import io.aeron.cluster.service.ClusterCounters;
import io.aeron.cluster.service.ClusteredService;
import io.aeron.cluster.service.ClusteredServiceContainer;
import io.aeron.cluster.service.ConsensusModuleProxy;
import io.aeron.cluster.service.RecoveryState;
import io.aeron.cluster.service.ServiceAdapter;
import io.aeron.cluster.service.ServiceSnapshotLoader;
import io.aeron.cluster.service.ServiceSnapshotTaker;
import io.aeron.cluster.service.UnmodifiableClientSessionCollection;
import io.aeron.exceptions.AeronException;
import io.aeron.exceptions.TimeoutException;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.Header;
import io.aeron.status.ReadableCounter;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.SemanticVersion;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.CountersReader;

final class ClusteredServiceAgent
implements Agent,
Cluster,
IdleStrategy {
    static final long MARK_FILE_UPDATE_INTERVAL_MS = TimeUnit.NANOSECONDS.toMillis(ClusteredServiceContainer.Configuration.MARK_FILE_UPDATE_INTERVAL_NS);
    private volatile boolean isAbort;
    private boolean isServiceActive;
    private final int serviceId;
    private int memberId = -1;
    private long ackId = 0L;
    private long terminationPosition = -1L;
    private long timeOfLastMarkFileUpdateMs;
    private long cachedTimeMs;
    private long clusterTime;
    private long logPosition = -1L;
    private long closeHandlerRegistrationId;
    private final IdleStrategy idleStrategy;
    private final ClusteredServiceContainer.Context ctx;
    private final Aeron aeron;
    private final AgentInvoker aeronAgentInvoker;
    private final ClusteredService service;
    private final ConsensusModuleProxy consensusModuleProxy;
    private final ServiceAdapter serviceAdapter;
    private final EpochClock epochClock;
    private final UnsafeBuffer headerBuffer = new UnsafeBuffer(new byte[65472]);
    private final DirectBufferVector headerVector = new DirectBufferVector(this.headerBuffer, 0, 32);
    private final SessionMessageHeaderEncoder sessionMessageHeaderEncoder = new SessionMessageHeaderEncoder();
    private final Long2ObjectHashMap<ClientSession> sessionByIdMap = new Long2ObjectHashMap();
    private final Collection<ClientSession> unmodifiableClientSessions = new UnmodifiableClientSessionCollection(this.sessionByIdMap.values());
    private final BoundedLogAdapter logAdapter;
    private String activeLifecycleCallbackName;
    private ReadableCounter commitPosition;
    private ActiveLogEvent activeLogEvent;
    private Cluster.Role role = Cluster.Role.FOLLOWER;
    private TimeUnit timeUnit = null;

    ClusteredServiceAgent(ClusteredServiceContainer.Context ctx) {
        this.logAdapter = new BoundedLogAdapter(this, ctx.logFragmentLimit());
        this.ctx = ctx;
        this.aeron = ctx.aeron();
        this.aeronAgentInvoker = ctx.aeron().conductorAgentInvoker();
        this.service = ctx.clusteredService();
        this.idleStrategy = ctx.idleStrategy();
        this.serviceId = ctx.serviceId();
        this.epochClock = ctx.epochClock();
        String channel = ctx.controlChannel();
        this.consensusModuleProxy = new ConsensusModuleProxy(this.aeron.addPublication(channel, ctx.consensusModuleStreamId()));
        this.serviceAdapter = new ServiceAdapter(this.aeron.addSubscription(channel, ctx.serviceStreamId()), this);
        this.sessionMessageHeaderEncoder.wrapAndApplyHeader(this.headerBuffer, 0, new MessageHeaderEncoder());
    }

    @Override
    public void onStart() {
        this.closeHandlerRegistrationId = this.aeron.addCloseHandler(this::abort);
        CountersReader counters = this.aeron.countersReader();
        this.commitPosition = this.awaitCommitPositionCounter(counters, this.ctx.clusterId());
        this.recoverState(counters);
    }

    @Override
    public void onClose() {
        this.aeron.removeCloseHandler(this.closeHandlerRegistrationId);
        if (this.isAbort) {
            this.ctx.abortLatch().countDown();
        } else {
            CountedErrorHandler errorHandler = this.ctx.countedErrorHandler();
            if (this.isServiceActive) {
                this.isServiceActive = false;
                try {
                    this.service.onTerminate(this);
                }
                catch (Throwable ex) {
                    errorHandler.onError(ex);
                }
            }
            if (!this.ctx.ownsAeronClient() && !this.aeron.isClosed()) {
                for (ClientSession session : this.sessionByIdMap.values()) {
                    session.disconnect(errorHandler);
                }
                CloseHelper.close(errorHandler, this.logAdapter);
                CloseHelper.close(errorHandler, this.serviceAdapter);
                CloseHelper.close(errorHandler, this.consensusModuleProxy);
            }
        }
        this.ctx.close();
    }

    @Override
    public int doWork() {
        int workCount = 0;
        if (this.checkForClockTick()) {
            this.pollServiceAdapter();
            ++workCount;
        }
        if (null != this.logAdapter.image()) {
            int polled = this.logAdapter.poll(this.commitPosition.get());
            workCount += polled;
            if (0 == polled && this.logAdapter.isDone()) {
                this.closeLog();
            }
        }
        return workCount;
    }

    @Override
    public String roleName() {
        return this.ctx.serviceName();
    }

    @Override
    public Cluster.Role role() {
        return this.role;
    }

    @Override
    public int memberId() {
        return this.memberId;
    }

    @Override
    public Aeron aeron() {
        return this.aeron;
    }

    @Override
    public ClusteredServiceContainer.Context context() {
        return this.ctx;
    }

    @Override
    public ClientSession getClientSession(long clusterSessionId) {
        return this.sessionByIdMap.get(clusterSessionId);
    }

    @Override
    public Collection<ClientSession> clientSessions() {
        return this.unmodifiableClientSessions;
    }

    @Override
    public void forEachClientSession(Consumer<? super ClientSession> action) {
        this.sessionByIdMap.values().forEach(action);
    }

    @Override
    public boolean closeClientSession(long clusterSessionId) {
        this.checkForLifecycleCallback();
        ClientSession clientSession = this.sessionByIdMap.get(clusterSessionId);
        if (clientSession == null) {
            throw new ClusterException("unknown clusterSessionId: " + clusterSessionId);
        }
        if (clientSession.isClosing()) {
            return true;
        }
        if (this.consensusModuleProxy.closeSession(clusterSessionId)) {
            clientSession.markClosing();
            return true;
        }
        return false;
    }

    @Override
    public TimeUnit timeUnit() {
        return this.timeUnit;
    }

    @Override
    public long time() {
        return this.clusterTime;
    }

    @Override
    public long logPosition() {
        return this.logPosition;
    }

    @Override
    public boolean scheduleTimer(long correlationId, long deadline) {
        this.checkForLifecycleCallback();
        return this.consensusModuleProxy.scheduleTimer(correlationId, deadline);
    }

    @Override
    public boolean cancelTimer(long correlationId) {
        this.checkForLifecycleCallback();
        return this.consensusModuleProxy.cancelTimer(correlationId);
    }

    @Override
    public long offer(DirectBuffer buffer, int offset, int length) {
        this.checkForLifecycleCallback();
        this.sessionMessageHeaderEncoder.clusterSessionId(0L);
        return this.consensusModuleProxy.offer(this.headerBuffer, 0, 32, buffer, offset, length);
    }

    @Override
    public long offer(DirectBufferVector[] vectors) {
        this.checkForLifecycleCallback();
        this.sessionMessageHeaderEncoder.clusterSessionId(0L);
        vectors[0] = this.headerVector;
        return this.consensusModuleProxy.offer(vectors);
    }

    @Override
    public long tryClaim(int length, BufferClaim bufferClaim) {
        this.checkForLifecycleCallback();
        this.sessionMessageHeaderEncoder.clusterSessionId(0L);
        return this.consensusModuleProxy.tryClaim(length + 32, bufferClaim, this.headerBuffer);
    }

    @Override
    public IdleStrategy idleStrategy() {
        return this;
    }

    @Override
    public void reset() {
        this.idleStrategy.reset();
    }

    @Override
    public void idle() {
        this.idleStrategy.idle();
        if (Thread.interrupted()) {
            LangUtil.rethrowUnchecked(new InterruptedException());
        }
        this.checkForClockTick();
    }

    @Override
    public void idle(int workCount) {
        this.idleStrategy.idle(workCount);
        if (workCount <= 0) {
            if (Thread.interrupted()) {
                LangUtil.rethrowUnchecked(new InterruptedException());
            }
            this.checkForClockTick();
        }
    }

    void onJoinLog(long leadershipTermId, long logPosition, long maxLogPosition, int memberId, int logSessionId, int logStreamId, boolean isStartup, Cluster.Role role, String logChannel) {
        this.logAdapter.maxLogPosition(logPosition);
        this.activeLogEvent = new ActiveLogEvent(leadershipTermId, logPosition, maxLogPosition, memberId, logSessionId, logStreamId, isStartup, role, logChannel);
    }

    void onServiceTerminationPosition(long logPosition) {
        this.terminationPosition = logPosition;
    }

    void onSessionMessage(long logPosition, long clusterSessionId, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        ClientSession clientSession = this.sessionByIdMap.get(clusterSessionId);
        this.service.onSessionMessage(clientSession, timestamp, buffer, offset, length, header);
    }

    void onTimerEvent(long logPosition, long correlationId, long timestamp) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        this.service.onTimerEvent(correlationId, timestamp);
    }

    void onSessionOpen(long leadershipTermId, long logPosition, long clusterSessionId, long timestamp, int responseStreamId, String responseChannel, byte[] encodedPrincipal) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        if (this.sessionByIdMap.containsKey(clusterSessionId)) {
            throw new ClusterException("clashing open clusterSessionId=" + clusterSessionId + " leadershipTermId=" + leadershipTermId + " logPosition=" + logPosition);
        }
        ClientSession session = new ClientSession(clusterSessionId, responseStreamId, responseChannel, encodedPrincipal, this);
        if (Cluster.Role.LEADER == this.role && this.ctx.isRespondingService()) {
            session.connect(this.aeron);
        }
        this.sessionByIdMap.put(clusterSessionId, session);
        this.service.onSessionOpen(session, timestamp);
    }

    void onSessionClose(long leadershipTermId, long logPosition, long clusterSessionId, long timestamp, CloseReason closeReason) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        ClientSession session = this.sessionByIdMap.remove(clusterSessionId);
        if (null == session) {
            throw new ClusterException("unknown clusterSessionId=" + clusterSessionId + " for close reason=" + (Object)((Object)closeReason) + " leadershipTermId=" + leadershipTermId + " logPosition=" + logPosition);
        }
        session.disconnect(this.ctx.countedErrorHandler());
        this.service.onSessionClose(session, timestamp, closeReason);
    }

    void onServiceAction(long leadershipTermId, long logPosition, long timestamp, ClusterAction action) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        this.executeAction(action, logPosition, leadershipTermId);
    }

    void onNewLeadershipTermEvent(long leadershipTermId, long logPosition, long timestamp, long termBaseLogPosition, int leaderMemberId, int logSessionId, TimeUnit timeUnit, int appVersion) {
        if (SemanticVersion.major(this.ctx.appVersion()) != SemanticVersion.major(appVersion)) {
            this.ctx.errorHandler().onError(new ClusterException("incompatible version: " + SemanticVersion.toString(this.ctx.appVersion()) + " log=" + SemanticVersion.toString(appVersion)));
            this.ctx.terminationHook().run();
        } else {
            this.sessionMessageHeaderEncoder.leadershipTermId(leadershipTermId);
            this.logPosition = logPosition;
            this.clusterTime = timestamp;
            this.timeUnit = timeUnit;
            this.service.onNewLeadershipTermEvent(leadershipTermId, logPosition, timestamp, termBaseLogPosition, leaderMemberId, logSessionId, timeUnit, appVersion);
        }
    }

    void onMembershipChange(long logPosition, long timestamp, ChangeType changeType, int memberId) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        if (memberId == this.memberId && changeType == ChangeType.QUIT) {
            this.terminate();
        }
    }

    void addSession(long clusterSessionId, int responseStreamId, String responseChannel, byte[] encodedPrincipal) {
        this.sessionByIdMap.put(clusterSessionId, new ClientSession(clusterSessionId, responseStreamId, responseChannel, encodedPrincipal, this));
    }

    void handleError(Throwable ex) {
        this.ctx.countedErrorHandler().onError(ex);
    }

    long offer(long clusterSessionId, Publication publication, DirectBuffer buffer, int offset, int length) {
        this.checkForLifecycleCallback();
        if (Cluster.Role.LEADER != this.role) {
            return 1L;
        }
        if (null == publication) {
            return -1L;
        }
        this.sessionMessageHeaderEncoder.clusterSessionId(clusterSessionId).timestamp(this.clusterTime);
        return publication.offer(this.headerBuffer, 0, 32, buffer, offset, length, null);
    }

    long offer(long clusterSessionId, Publication publication, DirectBufferVector[] vectors) {
        this.checkForLifecycleCallback();
        if (Cluster.Role.LEADER != this.role) {
            return 1L;
        }
        if (null == publication) {
            return -1L;
        }
        this.sessionMessageHeaderEncoder.clusterSessionId(clusterSessionId).timestamp(this.clusterTime);
        vectors[0] = this.headerVector;
        return publication.offer(vectors, null);
    }

    long tryClaim(long clusterSessionId, Publication publication, int length, BufferClaim bufferClaim) {
        this.checkForLifecycleCallback();
        if (Cluster.Role.LEADER != this.role) {
            int maxPayloadLength = this.headerBuffer.capacity() - 32;
            if (length > maxPayloadLength) {
                throw new IllegalArgumentException("claim exceeds maxPayloadLength=" + maxPayloadLength + ", length=" + length);
            }
            bufferClaim.wrap(this.headerBuffer, 0, length + 32);
            return 1L;
        }
        if (null == publication) {
            return -1L;
        }
        long offset = publication.tryClaim(length + 32, bufferClaim);
        if (offset > 0L) {
            this.sessionMessageHeaderEncoder.clusterSessionId(clusterSessionId).timestamp(this.clusterTime);
            bufferClaim.putBytes(this.headerBuffer, 0, 32);
        }
        return offset;
    }

    private void role(Cluster.Role newRole) {
        if (newRole != this.role) {
            this.role = newRole;
            this.activeLifecycleCallbackName = "onRoleChange";
            try {
                this.service.onRoleChange(newRole);
            }
            finally {
                this.activeLifecycleCallbackName = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recoverState(CountersReader counters) {
        int recoveryCounterId = this.awaitRecoveryCounter(counters);
        this.logPosition = RecoveryState.getLogPosition(counters, recoveryCounterId);
        this.clusterTime = RecoveryState.getTimestamp(counters, recoveryCounterId);
        long leadershipTermId = RecoveryState.getLeadershipTermId(counters, recoveryCounterId);
        this.sessionMessageHeaderEncoder.leadershipTermId(leadershipTermId);
        this.isServiceActive = true;
        this.activeLifecycleCallbackName = "onStart";
        try {
            if (-1L != leadershipTermId) {
                this.loadSnapshot(RecoveryState.getSnapshotRecordingId(counters, recoveryCounterId, this.serviceId));
            } else {
                this.service.onStart(this, null);
            }
        }
        finally {
            this.activeLifecycleCallbackName = null;
        }
        long id = this.ackId++;
        this.idleStrategy.reset();
        while (!this.consensusModuleProxy.ack(this.logPosition, this.clusterTime, id, this.aeron.clientId(), this.serviceId)) {
            this.idle();
        }
    }

    private int awaitRecoveryCounter(CountersReader counters) {
        this.idleStrategy.reset();
        int counterId = RecoveryState.findCounterId(counters, this.ctx.clusterId());
        while (-1 == counterId) {
            this.idle();
            counterId = RecoveryState.findCounterId(counters, this.ctx.clusterId());
        }
        return counterId;
    }

    private void closeLog() {
        this.logPosition = Math.max(this.logAdapter.image().position(), this.logPosition);
        CloseHelper.close(this.ctx.countedErrorHandler(), this.logAdapter);
        this.role(Cluster.Role.FOLLOWER);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void joinActiveLog(ActiveLogEvent activeLog) {
        Subscription logSubscription = this.aeron.addSubscription(activeLog.channel, activeLog.streamId);
        try {
            this.logAdapter.image(this.awaitImage(activeLog.sessionId, logSubscription));
            this.logAdapter.maxLogPosition(activeLog.maxLogPosition);
            logSubscription = null;
            long id = this.ackId++;
            this.idleStrategy.reset();
            while (!this.consensusModuleProxy.ack(activeLog.logPosition, this.clusterTime, id, -1L, this.serviceId)) {
                this.idle();
            }
        }
        finally {
            CloseHelper.quietClose(logSubscription);
        }
        for (ClientSession session : this.sessionByIdMap.values()) {
            if (Cluster.Role.LEADER == activeLog.role) {
                if (this.ctx.isRespondingService() && !activeLog.isStartup) {
                    session.connect(this.aeron);
                }
                session.resetClosing();
                continue;
            }
            session.disconnect(this.ctx.countedErrorHandler());
        }
        this.memberId = activeLog.memberId;
        this.ctx.clusterMarkFile().memberId(this.memberId);
        this.role(activeLog.role);
    }

    private Image awaitImage(int sessionId, Subscription subscription) {
        Image image;
        this.idleStrategy.reset();
        while ((image = subscription.imageBySessionId(sessionId)) == null) {
            this.idle();
        }
        return image;
    }

    private ReadableCounter awaitCommitPositionCounter(CountersReader counters, int clusterId) {
        this.idleStrategy.reset();
        int counterId = ClusterCounters.find(counters, 203, clusterId);
        while (-1 == counterId) {
            this.idle();
            counterId = ClusterCounters.find(counters, 203, clusterId);
        }
        return new ReadableCounter(counters, counterId);
    }

    private void loadSnapshot(long recordingId) {
        try (AeronArchive archive = AeronArchive.connect(this.ctx.archiveContext().clone());){
            String channel = this.ctx.replayChannel();
            int streamId = this.ctx.replayStreamId();
            int sessionId = (int)archive.startReplay(recordingId, 0L, -1L, channel, streamId);
            String replaySessionChannel = ChannelUri.addSessionId(channel, sessionId);
            try (Subscription subscription = this.aeron.addSubscription(replaySessionChannel, streamId);){
                Image image = this.awaitImage(sessionId, subscription);
                this.loadState(image);
                this.service.onStart(this, image);
            }
        }
    }

    private void loadState(Image image) {
        ServiceSnapshotLoader snapshotLoader = new ServiceSnapshotLoader(image, this);
        while (true) {
            int fragments = snapshotLoader.poll();
            if (snapshotLoader.isDone()) break;
            if (fragments == 0 && image.isClosed()) {
                throw new ClusterException("snapshot ended unexpectedly");
            }
            this.idle(fragments);
        }
        int appVersion = snapshotLoader.appVersion();
        if (SemanticVersion.major(this.ctx.appVersion()) != SemanticVersion.major(appVersion)) {
            throw new ClusterException("incompatible version: " + SemanticVersion.toString(this.ctx.appVersion()) + " snapshot=" + SemanticVersion.toString(appVersion));
        }
        this.timeUnit = snapshotLoader.timeUnit();
    }

    /*
     * Exception decompiling
     */
    private long onTakeSnapshot(long logPosition, long leadershipTermId) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void awaitRecordingComplete(long recordingId, long position, CountersReader counters, int counterId, AeronArchive archive) {
        this.idleStrategy.reset();
        while (counters.getCounterValue(counterId) < position) {
            this.idle();
            archive.checkForErrorResponse();
            if (RecordingPos.isActive(counters, counterId, recordingId)) continue;
            throw new ClusterException("recording stopped unexpectedly: " + recordingId);
        }
    }

    private void snapshotState(ExclusivePublication publication, long logPosition, long leadershipTermId) {
        ServiceSnapshotTaker snapshotTaker = new ServiceSnapshotTaker(publication, this.idleStrategy, this.aeronAgentInvoker);
        snapshotTaker.markBegin(2L, logPosition, leadershipTermId, 0, this.timeUnit, this.ctx.appVersion());
        for (ClientSession clientSession : this.sessionByIdMap.values()) {
            snapshotTaker.snapshotSession(clientSession);
        }
        snapshotTaker.markEnd(2L, logPosition, leadershipTermId, 0, this.timeUnit, this.ctx.appVersion());
    }

    private void executeAction(ClusterAction action, long logPosition, long leadershipTermId) {
        if (ClusterAction.SNAPSHOT == action) {
            long recordingId = this.onTakeSnapshot(logPosition, leadershipTermId);
            long id = this.ackId++;
            this.idleStrategy.reset();
            while (!this.consensusModuleProxy.ack(logPosition, this.clusterTime, id, recordingId, this.serviceId)) {
                this.idle();
            }
        }
    }

    private int awaitRecordingCounter(int sessionId, CountersReader counters, AeronArchive archive) {
        this.idleStrategy.reset();
        int counterId = RecordingPos.findCounterIdBySession(counters, sessionId);
        while (-1 == counterId) {
            this.idle();
            archive.checkForErrorResponse();
            counterId = RecordingPos.findCounterIdBySession(counters, sessionId);
        }
        return counterId;
    }

    private boolean checkForClockTick() {
        if (this.isAbort || this.aeron.isClosed()) {
            this.isAbort = true;
            throw new AgentTerminationException("unexpected Aeron close");
        }
        long nowMs = this.epochClock.time();
        if (this.cachedTimeMs != nowMs) {
            this.cachedTimeMs = nowMs;
            if (null != this.aeronAgentInvoker) {
                this.aeronAgentInvoker.invoke();
                if (this.isAbort || this.aeron.isClosed()) {
                    this.isAbort = true;
                    throw new AgentTerminationException("unexpected Aeron close");
                }
            }
            if (nowMs >= this.timeOfLastMarkFileUpdateMs + MARK_FILE_UPDATE_INTERVAL_MS) {
                this.ctx.clusterMarkFile().updateActivityTimestamp(nowMs);
                this.timeOfLastMarkFileUpdateMs = nowMs;
            }
            return true;
        }
        return false;
    }

    private void pollServiceAdapter() {
        this.serviceAdapter.poll();
        if (null != this.activeLogEvent && null == this.logAdapter.image()) {
            ActiveLogEvent event = this.activeLogEvent;
            this.activeLogEvent = null;
            this.joinActiveLog(event);
        }
        if (-1L != this.terminationPosition && this.logPosition >= this.terminationPosition) {
            this.terminate();
        }
    }

    private void terminate() {
        this.isServiceActive = false;
        this.activeLifecycleCallbackName = "onTerminate";
        try {
            this.service.onTerminate(this);
        }
        catch (Exception ex) {
            this.ctx.countedErrorHandler().onError(ex);
        }
        finally {
            this.activeLifecycleCallbackName = null;
        }
        long id = this.ackId++;
        while (!this.consensusModuleProxy.ack(this.logPosition, this.clusterTime, id, -1L, this.serviceId)) {
            this.idle();
        }
        this.terminationPosition = -1L;
        this.ctx.terminationHook().run();
    }

    private void checkForLifecycleCallback() {
        if (null != this.activeLifecycleCallbackName) {
            throw new ClusterException("sending messages or scheduling timers is not allowed from " + this.activeLifecycleCallbackName);
        }
    }

    private void abort() {
        this.isAbort = true;
        try {
            if (!this.ctx.abortLatch().await(15000L, TimeUnit.MILLISECONDS)) {
                this.ctx.countedErrorHandler().onError(new TimeoutException("awaiting abort latch", AeronException.Category.WARN));
            }
        }
        catch (InterruptedException ignore) {
            Thread.currentThread().interrupt();
        }
    }
}

