/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster.service;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.DirectBufferVector;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.client.ClusterEvent;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.ChangeType;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.ClusterAction;
import io.aeron.cluster.codecs.MessageHeaderEncoder;
import io.aeron.cluster.codecs.SessionMessageHeaderEncoder;
import io.aeron.cluster.service.ActiveLogEvent;
import io.aeron.cluster.service.BoundedLogAdapter;
import io.aeron.cluster.service.ClientSession;
import io.aeron.cluster.service.Cluster;
import io.aeron.cluster.service.ClusterCounters;
import io.aeron.cluster.service.ClusterMarkFile;
import io.aeron.cluster.service.ClusterTerminationException;
import io.aeron.cluster.service.ClusteredService;
import io.aeron.cluster.service.ClusteredServiceAgentHotFields;
import io.aeron.cluster.service.ClusteredServiceContainer;
import io.aeron.cluster.service.ConsensusModuleProxy;
import io.aeron.cluster.service.ContainerClientSession;
import io.aeron.cluster.service.RecoveryState;
import io.aeron.cluster.service.ServiceAdapter;
import io.aeron.cluster.service.ServiceSnapshotLoader;
import io.aeron.cluster.service.ServiceSnapshotTaker;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.exceptions.AeronEvent;
import io.aeron.exceptions.AeronException;
import io.aeron.exceptions.TimeoutException;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.Header;
import io.aeron.status.ReadableCounter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.MutableDirectBuffer;
import org.agrona.SemanticVersion;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.CountersReader;

final class ClusteredServiceAgent
extends ClusteredServiceAgentHotFields
implements Agent,
Cluster,
IdleStrategy {
    byte p064;
    byte p065;
    byte p066;
    byte p067;
    byte p068;
    byte p069;
    byte p070;
    byte p071;
    byte p072;
    byte p073;
    byte p074;
    byte p075;
    byte p076;
    byte p077;
    byte p078;
    byte p079;
    byte p080;
    byte p081;
    byte p082;
    byte p083;
    byte p084;
    byte p085;
    byte p086;
    byte p087;
    byte p088;
    byte p089;
    byte p090;
    byte p091;
    byte p092;
    byte p093;
    byte p094;
    byte p095;
    byte p096;
    byte p097;
    byte p098;
    byte p099;
    byte p100;
    byte p101;
    byte p102;
    byte p103;
    byte p104;
    byte p105;
    byte p106;
    byte p107;
    byte p108;
    byte p109;
    byte p110;
    byte p111;
    byte p112;
    byte p113;
    byte p114;
    byte p115;
    byte p116;
    byte p117;
    byte p118;
    byte p119;
    byte p120;
    byte p121;
    byte p122;
    byte p123;
    byte p124;
    byte p125;
    byte p126;
    byte p127;
    private static final long ONE_MILLISECOND_NS = TimeUnit.MILLISECONDS.toNanos(1L);
    private static final long MARK_FILE_UPDATE_INTERVAL_MS = TimeUnit.NANOSECONDS.toMillis(ClusteredServiceContainer.Configuration.MARK_FILE_UPDATE_INTERVAL_NS);
    private volatile boolean isAbort;
    private boolean isServiceActive;
    private final int serviceId;
    private int memberId = -1;
    private long closeHandlerRegistrationId;
    private long ackId = 0L;
    private long terminationPosition = -1L;
    private long markFileUpdateDeadlineMs;
    private long lastSlowTickNs;
    private long clusterTime;
    private long logPosition = -1L;
    private final IdleStrategy idleStrategy;
    private final ClusterMarkFile markFile;
    private final ClusteredServiceContainer.Context ctx;
    private final Aeron aeron;
    private final AgentInvoker aeronAgentInvoker;
    private final ClusteredService service;
    private final ConsensusModuleProxy consensusModuleProxy;
    private final ServiceAdapter serviceAdapter;
    private final EpochClock epochClock;
    private final NanoClock nanoClock;
    private final UnsafeBuffer messageBuffer = new UnsafeBuffer(new byte[65504]);
    private final UnsafeBuffer headerBuffer = new UnsafeBuffer((DirectBuffer)this.messageBuffer, 32, 65472);
    private final DirectBufferVector headerVector = new DirectBufferVector((DirectBuffer)this.headerBuffer, 0, 32);
    private final SessionMessageHeaderEncoder sessionMessageHeaderEncoder = new SessionMessageHeaderEncoder();
    private final ArrayList<ContainerClientSession> sessions = new ArrayList();
    private final Long2ObjectHashMap<ContainerClientSession> sessionByIdMap = new Long2ObjectHashMap();
    private final Collection<ClientSession> unmodifiableClientSessions = Collections.unmodifiableCollection(this.sessions);
    private final BoundedLogAdapter logAdapter;
    private final DutyCycleTracker dutyCycleTracker;
    private final String subscriptionAlias;
    private String activeLifecycleCallbackName;
    private ReadableCounter commitPosition;
    private ActiveLogEvent activeLogEvent;
    private Cluster.Role role = Cluster.Role.FOLLOWER;
    private TimeUnit timeUnit = null;

    ClusteredServiceAgent(ClusteredServiceContainer.Context ctx) {
        this.logAdapter = new BoundedLogAdapter(this, ctx.logFragmentLimit());
        this.ctx = ctx;
        this.markFile = ctx.clusterMarkFile();
        this.aeron = ctx.aeron();
        this.aeronAgentInvoker = ctx.aeron().conductorAgentInvoker();
        this.service = ctx.clusteredService();
        this.idleStrategy = ctx.idleStrategy();
        this.serviceId = ctx.serviceId();
        this.epochClock = ctx.epochClock();
        this.nanoClock = ctx.nanoClock();
        this.dutyCycleTracker = ctx.dutyCycleTracker();
        this.subscriptionAlias = "log-sc-" + ctx.serviceId();
        String channel = ctx.controlChannel();
        this.consensusModuleProxy = new ConsensusModuleProxy((Publication)this.aeron.addPublication(channel, ctx.consensusModuleStreamId()));
        this.serviceAdapter = new ServiceAdapter(this.aeron.addSubscription(channel, ctx.serviceStreamId()), this);
        this.sessionMessageHeaderEncoder.wrapAndApplyHeader((MutableDirectBuffer)this.headerBuffer, 0, new MessageHeaderEncoder());
    }

    public void onStart() {
        this.closeHandlerRegistrationId = this.aeron.addCloseHandler(this::abort);
        this.aeron.addUnavailableCounterHandler(this::counterUnavailable);
        CountersReader counters = this.aeron.countersReader();
        this.commitPosition = this.awaitCommitPositionCounter(counters, this.ctx.clusterId());
        this.recoverState(counters);
        this.dutyCycleTracker.update(this.nanoClock.nanoTime());
    }

    public void onClose() {
        this.aeron.removeCloseHandler(this.closeHandlerRegistrationId);
        if (this.isAbort) {
            this.ctx.abortLatch().countDown();
        } else {
            CountedErrorHandler errorHandler = this.ctx.countedErrorHandler();
            if (this.isServiceActive) {
                this.isServiceActive = false;
                try {
                    this.service.onTerminate(this);
                }
                catch (Exception ex) {
                    errorHandler.onError((Throwable)ex);
                }
            }
            if (!this.ctx.ownsAeronClient() && !this.aeron.isClosed()) {
                this.disconnectEgress(errorHandler);
                CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.logAdapter);
                CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.serviceAdapter);
                CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.consensusModuleProxy);
            }
        }
        this.markFile.updateActivityTimestamp(-1L);
        this.ctx.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int doWork() {
        int workCount = 0;
        long nowNs = this.nanoClock.nanoTime();
        this.dutyCycleTracker.measureAndUpdate(nowNs);
        try {
            if (this.checkForClockTick(nowNs)) {
                this.pollServiceAdapter();
                ++workCount;
            }
            if (null != this.logAdapter.image()) {
                int polled = this.logAdapter.poll(this.commitPosition.get());
                workCount += polled;
                if (0 == polled && this.logAdapter.isDone()) {
                    this.closeLog();
                }
            }
            try {
                this.isBackgroundInvocation = true;
            }
            finally {
                this.isBackgroundInvocation = false;
            }
        }
        catch (AgentTerminationException ex) {
            this.runTerminationHook();
            throw ex;
        }
        return workCount += this.service.doBackgroundWork(nowNs);
    }

    public String roleName() {
        return this.ctx.serviceName();
    }

    @Override
    public Cluster.Role role() {
        return this.role;
    }

    @Override
    public int memberId() {
        return this.memberId;
    }

    @Override
    public Aeron aeron() {
        return this.aeron;
    }

    @Override
    public ClusteredServiceContainer.Context context() {
        return this.ctx;
    }

    @Override
    public ClientSession getClientSession(long clusterSessionId) {
        return (ClientSession)this.sessionByIdMap.get(clusterSessionId);
    }

    @Override
    public Collection<ClientSession> clientSessions() {
        return this.unmodifiableClientSessions;
    }

    @Override
    public void forEachClientSession(Consumer<? super ClientSession> action) {
        this.sessions.forEach((Consumer<ContainerClientSession>)action);
    }

    @Override
    public boolean closeClientSession(long clusterSessionId) {
        this.checkForValidInvocation();
        ContainerClientSession clientSession = (ContainerClientSession)this.sessionByIdMap.get(clusterSessionId);
        if (clientSession == null) {
            throw new ClusterException("unknown clusterSessionId: " + clusterSessionId);
        }
        if (clientSession.isClosing()) {
            return true;
        }
        if (this.consensusModuleProxy.closeSession(clusterSessionId)) {
            clientSession.markClosing();
            return true;
        }
        return false;
    }

    @Override
    public TimeUnit timeUnit() {
        return this.timeUnit;
    }

    @Override
    public long time() {
        return this.clusterTime;
    }

    @Override
    public long logPosition() {
        return this.logPosition;
    }

    @Override
    public boolean scheduleTimer(long correlationId, long deadline) {
        this.checkForValidInvocation();
        return this.consensusModuleProxy.scheduleTimer(correlationId, deadline);
    }

    @Override
    public boolean cancelTimer(long correlationId) {
        this.checkForValidInvocation();
        return this.consensusModuleProxy.cancelTimer(correlationId);
    }

    @Override
    public long offer(DirectBuffer buffer, int offset, int length) {
        this.checkForValidInvocation();
        this.sessionMessageHeaderEncoder.clusterSessionId(this.context().serviceId());
        return this.consensusModuleProxy.offer((DirectBuffer)this.headerBuffer, 0, 32, buffer, offset, length);
    }

    @Override
    public long offer(DirectBufferVector[] vectors) {
        this.checkForValidInvocation();
        this.sessionMessageHeaderEncoder.clusterSessionId(this.context().serviceId());
        vectors[0] = this.headerVector;
        return this.consensusModuleProxy.offer(vectors);
    }

    @Override
    public long tryClaim(int length, BufferClaim bufferClaim) {
        this.checkForValidInvocation();
        this.sessionMessageHeaderEncoder.clusterSessionId(this.context().serviceId());
        return this.consensusModuleProxy.tryClaim(length + 32, bufferClaim, (DirectBuffer)this.headerBuffer);
    }

    @Override
    public IdleStrategy idleStrategy() {
        return this;
    }

    public void reset() {
        this.idleStrategy.reset();
    }

    public void idle() {
        this.idleStrategy.idle();
        if (Thread.currentThread().isInterrupted()) {
            throw new AgentTerminationException("interrupted");
        }
        this.checkForClockTick(this.nanoClock.nanoTime());
    }

    public void idle(int workCount) {
        this.idleStrategy.idle(workCount);
        if (workCount <= 0) {
            if (Thread.currentThread().isInterrupted()) {
                throw new AgentTerminationException("interrupted");
            }
            this.checkForClockTick(this.nanoClock.nanoTime());
        }
    }

    void onJoinLog(long logPosition, long maxLogPosition, int memberId, int logSessionId, int logStreamId, boolean isStartup, Cluster.Role role, String logChannel) {
        this.logAdapter.maxLogPosition(logPosition);
        this.activeLogEvent = new ActiveLogEvent(logPosition, maxLogPosition, memberId, logSessionId, logStreamId, isStartup, role, logChannel);
    }

    void onServiceTerminationPosition(long logPosition) {
        this.terminationPosition = logPosition;
    }

    void onSessionMessage(long logPosition, long clusterSessionId, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        ClientSession clientSession = (ClientSession)this.sessionByIdMap.get(clusterSessionId);
        this.service.onSessionMessage(clientSession, timestamp, buffer, offset, length, header);
    }

    void onTimerEvent(long logPosition, long correlationId, long timestamp) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        this.service.onTimerEvent(correlationId, timestamp);
    }

    void onSessionOpen(long leadershipTermId, long logPosition, long clusterSessionId, long timestamp, int responseStreamId, String responseChannel, byte[] encodedPrincipal) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        if (this.sessionByIdMap.containsKey(clusterSessionId)) {
            throw new ClusterException("clashing open clusterSessionId=" + clusterSessionId + " leadershipTermId=" + leadershipTermId + " logPosition=" + logPosition);
        }
        ContainerClientSession session = new ContainerClientSession(clusterSessionId, responseStreamId, responseChannel, encodedPrincipal, this);
        if (Cluster.Role.LEADER == this.role && this.ctx.isRespondingService()) {
            session.connect(this.aeron);
        }
        this.addSession(session);
        this.service.onSessionOpen(session, timestamp);
    }

    void onSessionClose(long leadershipTermId, long logPosition, long clusterSessionId, long timestamp, CloseReason closeReason) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        ContainerClientSession session = (ContainerClientSession)this.sessionByIdMap.remove(clusterSessionId);
        if (null == session) {
            throw new ClusterException("unknown clusterSessionId=" + clusterSessionId + " for close reason=" + (Object)((Object)closeReason) + " leadershipTermId=" + leadershipTermId + " logPosition=" + logPosition);
        }
        int size = this.sessions.size();
        for (int i = 0; i < size; ++i) {
            if (this.sessions.get(i).id() != clusterSessionId) continue;
            this.sessions.remove(i);
            break;
        }
        session.disconnect((ErrorHandler)this.ctx.countedErrorHandler());
        this.service.onSessionClose(session, timestamp, closeReason);
    }

    void onServiceAction(long leadershipTermId, long logPosition, long timestamp, ClusterAction action) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        this.executeAction(action, logPosition, leadershipTermId);
    }

    void onNewLeadershipTermEvent(long leadershipTermId, long logPosition, long timestamp, long termBaseLogPosition, int leaderMemberId, int logSessionId, TimeUnit timeUnit, int appVersion) {
        if (!this.ctx.appVersionValidator().isVersionCompatible(this.ctx.appVersion(), appVersion)) {
            this.ctx.errorHandler().onError((Throwable)((Object)new ClusterException("incompatible version: " + SemanticVersion.toString((int)this.ctx.appVersion()) + " log=" + SemanticVersion.toString((int)appVersion))));
            throw new AgentTerminationException();
        }
        this.sessionMessageHeaderEncoder.leadershipTermId(leadershipTermId);
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        this.timeUnit = timeUnit;
        this.service.onNewLeadershipTermEvent(leadershipTermId, logPosition, timestamp, termBaseLogPosition, leaderMemberId, logSessionId, timeUnit, appVersion);
    }

    void onMembershipChange(long logPosition, long timestamp, ChangeType changeType, int memberId) {
        this.logPosition = logPosition;
        this.clusterTime = timestamp;
        if (memberId == this.memberId && changeType == ChangeType.QUIT) {
            this.terminate(true);
        }
    }

    void addSession(long clusterSessionId, int responseStreamId, String responseChannel, byte[] encodedPrincipal) {
        ContainerClientSession session = new ContainerClientSession(clusterSessionId, responseStreamId, responseChannel, encodedPrincipal, this);
        this.addSession(session);
    }

    private void addSession(ContainerClientSession session) {
        int size;
        long clusterSessionId = session.id();
        this.sessionByIdMap.put(clusterSessionId, (Object)session);
        int addIndex = size = this.sessions.size();
        for (int i = size - 1; i >= 0; --i) {
            if (this.sessions.get(i).id() >= clusterSessionId) continue;
            addIndex = i + 1;
            break;
        }
        if (size == addIndex) {
            this.sessions.add(session);
        } else {
            this.sessions.add(addIndex, session);
        }
    }

    void handleError(Throwable ex) {
        this.ctx.countedErrorHandler().onError(ex);
    }

    long offer(long clusterSessionId, Publication publication, DirectBuffer buffer, int offset, int length) {
        this.checkForValidInvocation();
        if (Cluster.Role.LEADER != this.role) {
            return 1L;
        }
        if (null == publication) {
            return -1L;
        }
        this.sessionMessageHeaderEncoder.clusterSessionId(clusterSessionId).timestamp(this.clusterTime);
        return publication.offer((DirectBuffer)this.headerBuffer, 0, 32, buffer, offset, length, null);
    }

    long offer(long clusterSessionId, Publication publication, DirectBufferVector[] vectors) {
        this.checkForValidInvocation();
        if (Cluster.Role.LEADER != this.role) {
            return 1L;
        }
        if (null == publication) {
            return -1L;
        }
        this.sessionMessageHeaderEncoder.clusterSessionId(clusterSessionId).timestamp(this.clusterTime);
        vectors[0] = this.headerVector;
        return publication.offer(vectors, null);
    }

    long tryClaim(long clusterSessionId, Publication publication, int length, BufferClaim bufferClaim) {
        this.checkForValidInvocation();
        if (Cluster.Role.LEADER != this.role) {
            int maxPayloadLength = this.headerBuffer.capacity() - 32;
            if (length > maxPayloadLength) {
                throw new IllegalArgumentException("claim exceeds maxPayloadLength=" + maxPayloadLength + ", length=" + length);
            }
            bufferClaim.wrap((AtomicBuffer)this.messageBuffer, 0, 64 + length);
            return 1L;
        }
        if (null == publication) {
            return -1L;
        }
        long offset = publication.tryClaim(32 + length, bufferClaim);
        if (offset > 0L) {
            this.sessionMessageHeaderEncoder.clusterSessionId(clusterSessionId).timestamp(this.clusterTime);
            bufferClaim.putBytes((DirectBuffer)this.headerBuffer, 0, 32);
        }
        return offset;
    }

    private void role(Cluster.Role newRole) {
        if (newRole != this.role) {
            this.role = newRole;
            this.activeLifecycleCallbackName = "onRoleChange";
            try {
                this.service.onRoleChange(newRole);
            }
            finally {
                this.activeLifecycleCallbackName = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recoverState(CountersReader counters) {
        int recoveryCounterId = this.awaitRecoveryCounter(counters);
        this.logPosition = RecoveryState.getLogPosition(counters, recoveryCounterId);
        this.clusterTime = RecoveryState.getTimestamp(counters, recoveryCounterId);
        long leadershipTermId = RecoveryState.getLeadershipTermId(counters, recoveryCounterId);
        this.sessionMessageHeaderEncoder.leadershipTermId(leadershipTermId);
        this.isServiceActive = true;
        this.activeLifecycleCallbackName = "onStart";
        try {
            if (-1L != leadershipTermId) {
                this.loadSnapshot(RecoveryState.getSnapshotRecordingId(counters, recoveryCounterId, this.serviceId));
            } else {
                this.service.onStart(this, null);
            }
        }
        finally {
            this.activeLifecycleCallbackName = null;
        }
        long id = this.ackId++;
        this.idleStrategy.reset();
        while (!this.consensusModuleProxy.ack(this.logPosition, this.clusterTime, id, this.aeron.clientId(), this.serviceId)) {
            this.idle();
        }
    }

    private int awaitRecoveryCounter(CountersReader counters) {
        this.idleStrategy.reset();
        int counterId = RecoveryState.findCounterId(counters, this.ctx.clusterId());
        while (-1 == counterId) {
            this.idle();
            counterId = RecoveryState.findCounterId(counters, this.ctx.clusterId());
        }
        return counterId;
    }

    private void closeLog() {
        this.logPosition = Math.max(this.logAdapter.image().position(), this.logPosition);
        CloseHelper.close((ErrorHandler)this.ctx.countedErrorHandler(), (AutoCloseable)this.logAdapter);
        this.disconnectEgress(this.ctx.countedErrorHandler());
        this.role(Cluster.Role.FOLLOWER);
    }

    private void disconnectEgress(CountedErrorHandler errorHandler) {
        int size = this.sessions.size();
        for (int i = 0; i < size; ++i) {
            this.sessions.get(i).disconnect((ErrorHandler)errorHandler);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void joinActiveLog(ActiveLogEvent activeLog) {
        if (Cluster.Role.LEADER != activeLog.role) {
            this.disconnectEgress(this.ctx.countedErrorHandler());
        }
        String channel = new ChannelUriStringBuilder(activeLog.channel).alias(this.subscriptionAlias).build();
        Subscription logSubscription = this.aeron.addSubscription(channel, activeLog.streamId);
        try {
            Image image = this.awaitImage(activeLog.sessionId, logSubscription);
            if (image.joinPosition() != this.logPosition) {
                throw new ClusterException("Cluster log must be contiguous for joining image: expectedPosition=" + this.logPosition + " joinPosition=" + image.joinPosition());
            }
            if (activeLog.logPosition != this.logPosition) {
                throw new ClusterException("Cluster log must be contiguous for active log event: expectedPosition=" + this.logPosition + " eventPosition=" + activeLog.logPosition);
            }
            this.logAdapter.image(image);
            this.logAdapter.maxLogPosition(activeLog.maxLogPosition);
            logSubscription = null;
            long id = this.ackId++;
            this.idleStrategy.reset();
            while (!this.consensusModuleProxy.ack(activeLog.logPosition, this.clusterTime, id, -1L, this.serviceId)) {
                this.idle();
            }
        }
        finally {
            CloseHelper.quietClose((AutoCloseable)logSubscription);
        }
        this.memberId = activeLog.memberId;
        this.markFile.memberId(this.memberId);
        if (Cluster.Role.LEADER == activeLog.role) {
            int size = this.sessions.size();
            for (int i = 0; i < size; ++i) {
                ContainerClientSession session = this.sessions.get(i);
                if (this.ctx.isRespondingService() && !activeLog.isStartup) {
                    session.connect(this.aeron);
                }
                session.resetClosing();
            }
        }
        this.role(activeLog.role);
    }

    private Image awaitImage(int sessionId, Subscription subscription) {
        Image image;
        this.idleStrategy.reset();
        while ((image = subscription.imageBySessionId(sessionId)) == null) {
            this.idle();
        }
        return image;
    }

    private ReadableCounter awaitCommitPositionCounter(CountersReader counters, int clusterId) {
        this.idleStrategy.reset();
        int counterId = ClusterCounters.find(counters, 203, clusterId);
        while (-1 == counterId) {
            this.idle();
            counterId = ClusterCounters.find(counters, 203, clusterId);
        }
        return new ReadableCounter(counters, counters.getCounterRegistrationId(counterId), counterId);
    }

    private void loadSnapshot(long recordingId) {
        try (AeronArchive archive = AeronArchive.connect((AeronArchive.Context)this.ctx.archiveContext().clone());){
            String channel = this.ctx.replayChannel();
            int streamId = this.ctx.replayStreamId();
            int sessionId = (int)archive.startReplay(recordingId, 0L, -1L, channel, streamId);
            String replaySessionChannel = ChannelUri.addSessionId((String)channel, (int)sessionId);
            try (Subscription subscription = this.aeron.addSubscription(replaySessionChannel, streamId);){
                Image image = this.awaitImage(sessionId, subscription);
                this.loadState(image, archive);
                this.service.onStart(this, image);
            }
        }
    }

    private void loadState(Image image, AeronArchive archive) {
        ServiceSnapshotLoader snapshotLoader = new ServiceSnapshotLoader(image, this);
        while (true) {
            int fragments = snapshotLoader.poll();
            if (snapshotLoader.isDone()) break;
            if (fragments == 0) {
                archive.checkForErrorResponse();
                if (image.isClosed()) {
                    throw new ClusterException("snapshot ended unexpectedly: " + image);
                }
            }
            this.idle(fragments);
        }
        int appVersion = snapshotLoader.appVersion();
        if (!this.ctx.appVersionValidator().isVersionCompatible(this.ctx.appVersion(), appVersion)) {
            throw new ClusterException("incompatible app version: " + SemanticVersion.toString((int)this.ctx.appVersion()) + " snapshot=" + SemanticVersion.toString((int)appVersion));
        }
        this.timeUnit = snapshotLoader.timeUnit();
    }

    /*
     * Exception decompiling
     */
    private long onTakeSnapshot(long logPosition, long leadershipTermId) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 3 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void awaitRecordingComplete(long recordingId, long position, CountersReader counters, int counterId, AeronArchive archive) {
        this.idleStrategy.reset();
        while (counters.getCounterValue(counterId) < position) {
            this.idle();
            archive.checkForErrorResponse();
            if (RecordingPos.isActive((CountersReader)counters, (int)counterId, (long)recordingId)) continue;
            throw new ClusterException("recording stopped unexpectedly: " + recordingId);
        }
    }

    private void snapshotState(ExclusivePublication publication, long logPosition, long leadershipTermId) {
        ServiceSnapshotTaker snapshotTaker = new ServiceSnapshotTaker(publication, this.idleStrategy, this.aeronAgentInvoker);
        snapshotTaker.markBegin(2L, logPosition, leadershipTermId, 0, this.timeUnit, this.ctx.appVersion());
        int size = this.sessions.size();
        for (int i = 0; i < size; ++i) {
            snapshotTaker.snapshotSession(this.sessions.get(i));
        }
        snapshotTaker.markEnd(2L, logPosition, leadershipTermId, 0, this.timeUnit, this.ctx.appVersion());
    }

    private void executeAction(ClusterAction action, long logPosition, long leadershipTermId) {
        if (ClusterAction.SNAPSHOT == action) {
            long recordingId = this.onTakeSnapshot(logPosition, leadershipTermId);
            long id = this.ackId++;
            this.idleStrategy.reset();
            while (!this.consensusModuleProxy.ack(logPosition, this.clusterTime, id, recordingId, this.serviceId)) {
                this.idle();
            }
        }
    }

    private int awaitRecordingCounter(int sessionId, CountersReader counters, AeronArchive archive) {
        this.idleStrategy.reset();
        int counterId = RecordingPos.findCounterIdBySession((CountersReader)counters, (int)sessionId);
        while (-1 == counterId) {
            this.idle();
            archive.checkForErrorResponse();
            counterId = RecordingPos.findCounterIdBySession((CountersReader)counters, (int)sessionId);
        }
        return counterId;
    }

    private boolean checkForClockTick(long nowNs) {
        if (this.isAbort || this.aeron.isClosed()) {
            this.isAbort = true;
            throw new AgentTerminationException("unexpected Aeron close");
        }
        if (nowNs - this.lastSlowTickNs > ONE_MILLISECOND_NS) {
            this.lastSlowTickNs = nowNs;
            long nowMs = this.epochClock.time();
            if (this.commitPosition.isClosed()) {
                this.ctx.errorLog().record((Throwable)new AeronEvent("commit-pos counter unexpectedly closed, terminating", AeronException.Category.WARN));
                throw new ClusterTerminationException(true);
            }
            if (null != this.aeronAgentInvoker) {
                this.aeronAgentInvoker.invoke();
                if (this.isAbort || this.aeron.isClosed()) {
                    this.isAbort = true;
                    throw new AgentTerminationException("unexpected Aeron close");
                }
            }
            if (nowMs >= this.markFileUpdateDeadlineMs) {
                this.markFileUpdateDeadlineMs = nowMs + MARK_FILE_UPDATE_INTERVAL_MS;
                this.markFile.updateActivityTimestamp(nowMs);
            }
            return true;
        }
        return false;
    }

    private void pollServiceAdapter() {
        this.serviceAdapter.poll();
        if (null != this.activeLogEvent && null == this.logAdapter.image()) {
            ActiveLogEvent event = this.activeLogEvent;
            this.activeLogEvent = null;
            this.joinActiveLog(event);
        }
        if (-1L != this.terminationPosition && this.logPosition >= this.terminationPosition) {
            if (this.logPosition > this.terminationPosition) {
                this.ctx.countedErrorHandler().onError((Throwable)((Object)new ClusterEvent("service terminate: logPosition=" + this.logPosition + " > terminationPosition=" + this.terminationPosition)));
            }
            this.terminate(this.logPosition == this.terminationPosition);
        }
    }

    private void terminate(boolean isTerminationExpected) {
        this.isServiceActive = false;
        this.activeLifecycleCallbackName = "onTerminate";
        try {
            this.service.onTerminate(this);
        }
        catch (Exception ex) {
            this.ctx.countedErrorHandler().onError((Throwable)ex);
        }
        finally {
            this.activeLifecycleCallbackName = null;
        }
        try {
            int attempts = 5;
            long id = this.ackId++;
            while (!this.consensusModuleProxy.ack(this.logPosition, this.clusterTime, id, -1L, this.serviceId) && 0 != --attempts) {
                this.idle();
            }
        }
        catch (Exception ex) {
            this.ctx.countedErrorHandler().onError((Throwable)ex);
        }
        this.terminationPosition = -1L;
        throw new ClusterTerminationException(isTerminationExpected);
    }

    private void checkForValidInvocation() {
        if (null != this.activeLifecycleCallbackName) {
            throw new ClusterException("sending messages or scheduling timers is not allowed from " + this.activeLifecycleCallbackName);
        }
        if (this.isBackgroundInvocation) {
            throw new ClusterException("sending messages or scheduling timers is not allowed from ClusteredService.doBackgroundWork");
        }
    }

    private void abort() {
        this.isAbort = true;
        try {
            if (!this.ctx.abortLatch().await(15000L, TimeUnit.MILLISECONDS)) {
                this.ctx.countedErrorHandler().onError((Throwable)new TimeoutException("awaiting abort latch", AeronException.Category.WARN));
            }
        }
        catch (InterruptedException ignore) {
            Thread.currentThread().interrupt();
        }
    }

    private void counterUnavailable(CountersReader countersReader, long registrationId, int counterId) {
        ReadableCounter commitPosition = this.commitPosition;
        if (null != commitPosition && commitPosition.counterId() == counterId && commitPosition.registrationId() == registrationId) {
            commitPosition.close();
        }
    }

    private void runTerminationHook() {
        try {
            this.ctx.terminationHook().run();
        }
        catch (Exception ex) {
            this.ctx.countedErrorHandler().onError((Throwable)ex);
        }
    }
}

