/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.FragmentAssembler;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.client.ControlResponsePoller;
import io.aeron.archive.client.RecordingSignalPoller;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.ClusterBackup;
import io.aeron.cluster.ClusterBackupEventsListener;
import io.aeron.cluster.ClusterMember;
import io.aeron.cluster.ConsensusPublisher;
import io.aeron.cluster.LogSourceValidator;
import io.aeron.cluster.PublicationGroup;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.SnapshotReplication;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.client.ClusterEvent;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.BackupResponseDecoder;
import io.aeron.cluster.codecs.ChallengeDecoder;
import io.aeron.cluster.codecs.EventCode;
import io.aeron.cluster.codecs.MessageHeaderDecoder;
import io.aeron.cluster.codecs.SessionEventDecoder;
import io.aeron.cluster.service.ClusterMarkFile;
import io.aeron.exceptions.AeronException;
import io.aeron.exceptions.TimeoutException;
import io.aeron.logbuffer.Header;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.agrona.BitUtil;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.status.CountersReader;

public final class ClusterBackupAgent
implements Agent {
    public static final long MARK_FILE_UPDATE_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1L);
    private static final int SLOW_TICK_INTERVAL_MS = 10;
    private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    private final BackupResponseDecoder backupResponseDecoder = new BackupResponseDecoder();
    private final ChallengeDecoder challengeDecoder = new ChallengeDecoder();
    private final SessionEventDecoder sessionEventDecoder = new SessionEventDecoder();
    private final ClusterBackup.Context ctx;
    private final ClusterMarkFile markFile;
    private final AgentInvoker aeronClientInvoker;
    private final EpochClock epochClock;
    private final Aeron aeron;
    private final ConsensusPublisher consensusPublisher = new ConsensusPublisher();
    private final ArrayList<RecordingLog.Snapshot> snapshotsToRetrieve = new ArrayList(4);
    private final ArrayList<RecordingLog.Snapshot> snapshotsRetrieved = new ArrayList(4);
    private final Counter stateCounter;
    private final Counter liveLogPositionCounter;
    private final Counter nextQueryDeadlineMsCounter;
    private final ClusterBackupEventsListener eventsListener;
    private final long backupResponseTimeoutMs;
    private final long backupQueryIntervalMs;
    private final long backupProgressTimeoutMs;
    private final long coolDownIntervalMs;
    private final long unavailableCounterHandlerRegistrationId;
    private final PublicationGroup<ExclusivePublication> consensusPublicationGroup;
    private final LogSourceValidator logSourceValidator;
    private ClusterBackup.State state = ClusterBackup.State.BACKUP_QUERY;
    private RecordingSignalPoller recordingSignalPoller;
    private AeronArchive backupArchive;
    private AeronArchive.AsyncConnect clusterArchiveAsyncConnect;
    private AeronArchive clusterArchive;
    private SnapshotReplication snapshotReplication;
    private final FragmentAssembler fragmentAssembler = new FragmentAssembler(this::onFragment);
    private final Subscription consensusSubscription;
    private ClusterMember[] clusterMembers;
    private ClusterMember logSupplierMember;
    private RecordingLog recordingLog;
    private RecordingLog.Entry leaderLogEntry;
    private RecordingLog.Entry leaderLastTermEntry;
    private Subscription recordingSubscription;
    private String replayChannel;
    private String recordingChannel;
    private long slowTickDeadlineMs = 0L;
    private long markFileUpdateDeadlineMs = 0L;
    private long timeOfLastBackupQueryMs = 0L;
    private long timeOfLastProgressMs = 0L;
    private long coolDownDeadlineMs = -1L;
    private long correlationId = -1L;
    private long clusterLogRecordingId = -1L;
    private long liveLogRecordingSubscriptionId = -1L;
    private long liveLogRecordingId = -1L;
    private long liveLogReplaySessionId = -1L;
    private int leaderCommitPositionCounterId = -1;
    private int liveLogRecordingCounterId = -1;
    private int liveLogRecordingSessionId = -1;

    ClusterBackupAgent(ClusterBackup.Context ctx) {
        this.ctx = ctx;
        this.aeron = ctx.aeron();
        this.epochClock = ctx.epochClock();
        this.backupResponseTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.clusterBackupResponseTimeoutNs());
        this.backupQueryIntervalMs = TimeUnit.NANOSECONDS.toMillis(ctx.clusterBackupIntervalNs());
        this.backupProgressTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.clusterBackupProgressTimeoutNs());
        this.coolDownIntervalMs = TimeUnit.NANOSECONDS.toMillis(ctx.clusterBackupCoolDownIntervalNs());
        this.markFile = ctx.clusterMarkFile();
        this.eventsListener = ctx.eventsListener();
        String[] clusterConsensusEndpoints = ctx.clusterConsensusEndpoints().split(",");
        this.consensusPublicationGroup = new PublicationGroup<ExclusivePublication>(clusterConsensusEndpoints, ctx.consensusChannel(), ctx.consensusStreamId(), Aeron::addExclusivePublication);
        this.consensusPublicationGroup.shuffle();
        this.aeronClientInvoker = this.aeron.conductorAgentInvoker();
        this.aeronClientInvoker.invoke();
        this.unavailableCounterHandlerRegistrationId = this.aeron.addUnavailableCounterHandler(this::onUnavailableCounter);
        this.consensusSubscription = this.aeron.addSubscription(ctx.consensusChannel(), ctx.consensusStreamId());
        this.stateCounter = ctx.stateCounter();
        this.liveLogPositionCounter = ctx.liveLogPositionCounter();
        this.nextQueryDeadlineMsCounter = ctx.nextQueryDeadlineMsCounter();
        this.logSourceValidator = new LogSourceValidator(ctx.sourceType());
    }

    @Override
    public void onStart() {
        this.recordingLog = new RecordingLog(this.ctx.clusterDir(), true);
        this.backupArchive = AeronArchive.connect(this.ctx.archiveContext().clone());
        this.recordingSignalPoller = new RecordingSignalPoller(this.backupArchive.controlSessionId(), this.backupArchive.controlResponsePoller().subscription());
        long nowMs = this.epochClock.time();
        this.nextQueryDeadlineMsCounter.setRelease(nowMs - 1L);
        this.timeOfLastProgressMs = nowMs;
    }

    @Override
    public void onClose() {
        if (!this.aeron.isClosed()) {
            this.aeron.removeUnavailableCounterHandler(this.unavailableCounterHandlerRegistrationId);
            CloseHelper.close(this.ctx.countedErrorHandler(), this.snapshotReplication);
            this.stopReplay();
            this.stopRecording();
            if (!this.ctx.ownsAeronClient()) {
                CloseHelper.closeAll((ErrorHandler)this.ctx.countedErrorHandler(), this.consensusSubscription, this.consensusPublicationGroup, this.recordingSubscription);
            }
            this.state(ClusterBackup.State.CLOSED, this.epochClock.time());
        }
        CloseHelper.closeAll((ErrorHandler)this.ctx.countedErrorHandler(), this.backupArchive, this.clusterArchiveAsyncConnect, this.clusterArchive, this.recordingLog);
        this.markFile.signalTerminated();
        this.ctx.close();
    }

    @Override
    public int doWork() {
        long nowMs = this.epochClock.time();
        int workCount = 0;
        try {
            if (nowMs > this.slowTickDeadlineMs) {
                this.slowTickDeadlineMs = nowMs + 10L;
                workCount += this.slowTick(nowMs);
            }
            workCount += this.consensusSubscription.poll(this.fragmentAssembler, 10);
            switch (this.state) {
                case BACKUP_QUERY: {
                    workCount += this.backupQuery(nowMs);
                    break;
                }
                case SNAPSHOT_RETRIEVE: {
                    workCount += this.snapshotRetrieve(nowMs);
                    break;
                }
                case LIVE_LOG_RECORD: {
                    workCount += this.liveLogRecord(nowMs);
                    break;
                }
                case LIVE_LOG_REPLAY: {
                    workCount += this.liveLogReplay(nowMs);
                    break;
                }
                case UPDATE_RECORDING_LOG: {
                    workCount += this.updateRecordingLog(nowMs);
                    break;
                }
                case BACKING_UP: {
                    workCount += this.backingUp(nowMs);
                    break;
                }
                case RESET_BACKUP: {
                    workCount += this.resetBackup(nowMs);
                    break;
                }
                case CLOSED: {
                    return workCount;
                }
            }
            if (this.hasProgressStalled(nowMs)) {
                if (null != this.eventsListener) {
                    this.eventsListener.onPossibleFailure(new TimeoutException("progress has stalled", AeronException.Category.WARN));
                }
                this.state(ClusterBackup.State.RESET_BACKUP, nowMs);
            }
        }
        catch (AgentTerminationException ex) {
            this.runTerminationHook(ex);
        }
        catch (Exception ex) {
            if (null != this.eventsListener) {
                this.eventsListener.onPossibleFailure(ex);
            }
            this.state(ClusterBackup.State.RESET_BACKUP, nowMs);
            throw ex;
        }
        return workCount;
    }

    @Override
    public String roleName() {
        return "cluster-backup";
    }

    private void reset() {
        this.clusterMembers = null;
        this.logSupplierMember = null;
        this.leaderLogEntry = null;
        this.leaderLastTermEntry = null;
        this.liveLogRecordingCounterId = -1;
        this.liveLogRecordingId = -1L;
        this.liveLogRecordingSessionId = -1;
        this.snapshotsToRetrieve.clear();
        this.snapshotsRetrieved.clear();
        this.fragmentAssembler.clear();
        CloseHelper.close(this.snapshotReplication);
        this.snapshotReplication = null;
        this.stopRecording();
        this.stopReplay();
        CloseHelper.closeAll((ErrorHandler)this.ctx.countedErrorHandler(), this.consensusPublicationGroup, this.clusterArchive, this.clusterArchiveAsyncConnect, this.recordingSubscription);
        this.clusterArchive = null;
        this.clusterArchiveAsyncConnect = null;
        this.recordingSubscription = null;
    }

    private void stopReplay() {
        if (-1L != this.liveLogReplaySessionId) {
            if (null != this.clusterArchive) {
                try {
                    this.clusterArchive.stopReplay(this.liveLogReplaySessionId);
                }
                catch (Exception ex) {
                    this.ctx.countedErrorHandler().onError(new ClusterEvent("failed to stop log replay: " + String.valueOf(ex)));
                }
            }
            this.liveLogReplaySessionId = -1L;
        }
    }

    private void stopRecording() {
        if (-1L != this.liveLogRecordingSubscriptionId) {
            try {
                this.backupArchive.tryStopRecording(this.liveLogRecordingSubscriptionId);
            }
            catch (Exception ex) {
                this.ctx.countedErrorHandler().onError(new ClusterEvent("failed to stop log recording: " + String.valueOf(ex)));
            }
            this.liveLogRecordingSubscriptionId = -1L;
        }
    }

    private void onUnavailableCounter(CountersReader counters, long registrationId, int counterId) {
        if (counterId == this.liveLogRecordingCounterId) {
            if (null != this.eventsListener) {
                this.eventsListener.onPossibleFailure(new ClusterEvent("log recording counter unexpectedly unavailable"));
            }
            this.state(ClusterBackup.State.RESET_BACKUP, this.epochClock.time());
        }
    }

    private void onFragment(DirectBuffer buffer, int offset, int length, Header header) {
        this.messageHeaderDecoder.wrap(buffer, offset);
        int schemaId = this.messageHeaderDecoder.schemaId();
        if (schemaId != 111) {
            throw new ClusterException("expected schemaId=111, actual=" + schemaId);
        }
        switch (this.messageHeaderDecoder.templateId()) {
            case 78: {
                this.backupResponseDecoder.wrap(buffer, offset + 8, this.messageHeaderDecoder.blockLength(), this.messageHeaderDecoder.version());
                int memberId = BackupResponseDecoder.memberIdNullValue() != this.backupResponseDecoder.memberId() ? this.backupResponseDecoder.memberId() : -1;
                this.onBackupResponse(this.backupResponseDecoder.correlationId(), this.backupResponseDecoder.logRecordingId(), this.backupResponseDecoder.logLeadershipTermId(), this.backupResponseDecoder.logTermBaseLogPosition(), this.backupResponseDecoder.lastLeadershipTermId(), this.backupResponseDecoder.lastTermBaseLogPosition(), this.backupResponseDecoder.commitPositionCounterId(), this.backupResponseDecoder.leaderMemberId(), memberId, this.backupResponseDecoder);
                break;
            }
            case 7: {
                this.challengeDecoder.wrapAndApplyHeader(buffer, offset, this.messageHeaderDecoder);
                byte[] encodedChallenge = new byte[this.challengeDecoder.encodedChallengeLength()];
                this.challengeDecoder.getEncodedChallenge(encodedChallenge, 0, this.challengeDecoder.encodedChallengeLength());
                this.onChallenge(this.challengeDecoder.clusterSessionId(), encodedChallenge);
                break;
            }
            case 2: {
                this.sessionEventDecoder.wrapAndApplyHeader(buffer, offset, this.messageHeaderDecoder);
                long correlationId = this.sessionEventDecoder.correlationId();
                int leaderMemberId = this.sessionEventDecoder.leaderMemberId();
                EventCode eventCode = this.sessionEventDecoder.code();
                String detail = this.sessionEventDecoder.detail();
                this.onSessionEvent(correlationId, leaderMemberId, eventCode, detail);
            }
        }
    }

    private void onBackupResponse(long correlationId, long logRecordingId, long logLeadershipTermId, long logTermBaseLogPosition, long lastLeadershipTermId, long lastTermBaseLogPosition, int commitPositionCounterId, int leaderMemberId, int memberId, BackupResponseDecoder backupResponseDecoder) {
        if (-1 == memberId) {
            this.ctx.errorHandler().onError(new ClusterEvent("onBackupResponse(): memberId is null, retrying for compatible node"));
            this.state(ClusterBackup.State.RESET_BACKUP, this.epochClock.time());
            return;
        }
        if (!this.logSourceValidator.isAcceptable(leaderMemberId, memberId)) {
            this.consensusPublicationGroup.closeAndExcludeCurrent();
            this.state(ClusterBackup.State.RESET_BACKUP, this.epochClock.time());
            return;
        }
        if (null != this.logSupplierMember && this.logSupplierMember.id() != memberId) {
            this.state(ClusterBackup.State.RESET_BACKUP, this.epochClock.time());
            return;
        }
        if (ClusterBackup.State.BACKUP_QUERY == this.state && correlationId == this.correlationId) {
            long nowMs;
            RecordingLog.Entry lastTerm;
            BackupResponseDecoder.SnapshotsDecoder snapshotsDecoder = backupResponseDecoder.snapshots();
            if (snapshotsDecoder.count() > 0) {
                for (BackupResponseDecoder.SnapshotsDecoder snapshot : snapshotsDecoder) {
                    RecordingLog.Entry entry = this.recordingLog.getLatestSnapshot(snapshot.serviceId());
                    if (null != entry && snapshot.logPosition() == entry.logPosition) continue;
                    this.snapshotsToRetrieve.add(new RecordingLog.Snapshot(snapshot.recordingId(), snapshot.leadershipTermId(), snapshot.termBaseLogPosition(), snapshot.logPosition(), snapshot.timestamp(), snapshot.serviceId()));
                }
            }
            if (null == this.logSupplierMember || memberId != this.logSupplierMember.id() || logRecordingId != this.clusterLogRecordingId) {
                this.clusterLogRecordingId = logRecordingId;
                this.leaderLogEntry = new RecordingLog.Entry(logRecordingId, logLeadershipTermId, logTermBaseLogPosition, -1L, -1L, -1, 0, null, true, -1);
            }
            if (null == (lastTerm = this.recordingLog.findLastTerm()) || lastLeadershipTermId != lastTerm.leadershipTermId) {
                this.leaderLastTermEntry = new RecordingLog.Entry(logRecordingId, lastLeadershipTermId, lastTermBaseLogPosition, -1L, -1L, -1, 0, null, true, -1);
            }
            this.timeOfLastBackupQueryMs = 0L;
            this.correlationId = -1L;
            this.leaderCommitPositionCounterId = commitPositionCounterId;
            this.clusterMembers = ClusterMember.parse(backupResponseDecoder.clusterMembers());
            ClusterMember.setIsLeader(this.clusterMembers, leaderMemberId);
            this.logSupplierMember = ClusterMember.findMember(this.clusterMembers, memberId);
            if (null == this.logSupplierMember) {
                throw new ClusterException(memberId + " not found in " + Arrays.toString(this.clusterMembers));
            }
            this.logSupplierMember.leadershipTermId(logLeadershipTermId);
            if (null != this.eventsListener) {
                this.eventsListener.onBackupResponse(this.clusterMembers, this.logSupplierMember, this.snapshotsToRetrieve);
            }
            if (null == this.clusterArchive) {
                CloseHelper.close(this.clusterArchiveAsyncConnect);
                AeronArchive.Context context = this.ctx.clusterArchiveContext().clone();
                ChannelUri logSupplierArchiveUri = ChannelUri.parse(context.controlRequestChannel());
                logSupplierArchiveUri.put("endpoint", this.logSupplierMember.archiveEndpoint());
                context.controlRequestChannel(logSupplierArchiveUri.toString());
                if (null != this.logSupplierMember.archiveResponseEndpoint()) {
                    ChannelUri logSupplierResponseUri = ChannelUri.parse(context.controlResponseChannel());
                    logSupplierResponseUri.put("control-mode", "response");
                    logSupplierResponseUri.remove("endpoint");
                    logSupplierResponseUri.put("control", this.logSupplierMember.archiveResponseEndpoint());
                    context.controlResponseChannel(logSupplierResponseUri.toString());
                }
                this.clusterArchiveAsyncConnect = AeronArchive.asyncConnect(context);
            }
            this.timeOfLastProgressMs = nowMs = this.epochClock.time();
            this.state(this.snapshotsToRetrieve.isEmpty() ? ClusterBackup.State.LIVE_LOG_RECORD : ClusterBackup.State.SNAPSHOT_RETRIEVE, nowMs);
        }
    }

    private void onChallenge(long clusterSessionId, byte[] encodedChallenge) {
        byte[] challengeResponse = this.ctx.credentialsSupplier().onChallenge(encodedChallenge);
        this.correlationId = this.ctx.aeron().nextCorrelationId();
        this.consensusPublisher.challengeResponse(this.consensusPublicationGroup.current(), this.correlationId, clusterSessionId, challengeResponse);
    }

    private void onSessionEvent(long correlationId, int leaderMemberId, EventCode eventCode, String detail) {
        if (this.correlationId == correlationId) {
            if (EventCode.ERROR == eventCode || EventCode.AUTHENTICATION_REJECTED == eventCode) {
                throw new ClusterException(String.valueOf((Object)eventCode) + ": " + detail);
            }
            if (EventCode.REDIRECT == eventCode) {
                this.consensusPublicationGroup.closeAndExcludeCurrent();
                this.state(ClusterBackup.State.RESET_BACKUP, this.epochClock.time());
            }
        }
    }

    private int slowTick(long nowMs) {
        String errorResponse;
        int workCount = this.aeronClientInvoker.invoke();
        if (this.aeron.isClosed()) {
            throw new AgentTerminationException("unexpected Aeron close");
        }
        if (nowMs >= this.markFileUpdateDeadlineMs) {
            this.markFileUpdateDeadlineMs = nowMs + MARK_FILE_UPDATE_INTERVAL_MS;
            this.markFile.updateActivityTimestamp(nowMs);
        }
        workCount += this.pollBackupArchiveEvents();
        if (-1L == this.correlationId && null != this.clusterArchive && null != (errorResponse = this.clusterArchive.pollForErrorResponse())) {
            this.ctx.countedErrorHandler().onError(new ClusterEvent("cluster archive error: " + errorResponse));
            this.state(ClusterBackup.State.RESET_BACKUP, nowMs);
        }
        return workCount;
    }

    private int resetBackup(long nowMs) {
        this.timeOfLastProgressMs = nowMs;
        if (-1L == this.coolDownDeadlineMs) {
            this.coolDownDeadlineMs = nowMs + this.coolDownIntervalMs;
            this.reset();
            return 1;
        }
        if (nowMs > this.coolDownDeadlineMs) {
            this.coolDownDeadlineMs = -1L;
            this.state(ClusterBackup.State.BACKUP_QUERY, nowMs);
            return 1;
        }
        return 0;
    }

    private int backupQuery(long nowMs) {
        if (null == this.consensusPublicationGroup.current() || nowMs > this.timeOfLastBackupQueryMs + this.backupResponseTimeoutMs) {
            this.consensusPublicationGroup.next(this.aeron);
            this.correlationId = -1L;
            this.timeOfLastBackupQueryMs = nowMs;
            return 1;
        }
        if (-1L == this.correlationId && this.consensusPublicationGroup.isConnected()) {
            long correlationId = this.aeron.nextCorrelationId();
            if (this.consensusPublisher.backupQuery(this.consensusPublicationGroup.current(), correlationId, this.ctx.consensusStreamId(), AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION, this.ctx.consensusChannel(), this.ctx.credentialsSupplier().encodedCredentials())) {
                this.timeOfLastBackupQueryMs = nowMs;
                this.correlationId = correlationId;
                return 1;
            }
        }
        return 0;
    }

    private int snapshotRetrieve(long nowMs) {
        int workCount = 0;
        if (null == this.clusterArchive) {
            int step = this.clusterArchiveAsyncConnect.step();
            this.clusterArchive = this.clusterArchiveAsyncConnect.poll();
            return null == this.clusterArchive ? this.clusterArchiveAsyncConnect.step() - step : 1;
        }
        if (null == this.snapshotReplication) {
            ChannelUri replicationUri = ChannelUri.parse(this.ctx.catchupChannel());
            replicationUri.put("endpoint", this.ctx.catchupEndpoint());
            this.snapshotReplication = new SnapshotReplication(this.backupArchive, this.clusterArchive.context().controlRequestStreamId(), this.clusterArchive.context().controlRequestChannel(), replicationUri.toString(), this.ctx.replicationProgressTimeoutNs(), this.ctx.replicationProgressIntervalNs());
            this.snapshotsToRetrieve.forEach(this.snapshotReplication::addSnapshot);
            ++workCount;
        }
        workCount += this.snapshotReplication.poll(TimeUnit.MILLISECONDS.toNanos(nowMs));
        workCount += this.pollBackupArchiveEvents();
        this.timeOfLastProgressMs = nowMs;
        if (this.snapshotReplication.isComplete()) {
            this.snapshotsRetrieved.addAll(this.snapshotReplication.snapshotsRetrieved());
            this.snapshotReplication.close();
            this.snapshotReplication = null;
            this.state(ClusterBackup.State.LIVE_LOG_RECORD, nowMs);
            ++workCount;
        }
        return workCount;
    }

    private int liveLogRecord(long nowMs) {
        int workCount = 0;
        if (-1L == this.liveLogRecordingSubscriptionId) {
            String catchupEndpoint;
            if (-1 == this.liveLogRecordingSessionId) {
                this.liveLogRecordingSessionId = BitUtil.generateRandomisedId();
            }
            if ((catchupEndpoint = this.ctx.catchupEndpoint()).endsWith(":0")) {
                if (null == this.recordingSubscription) {
                    ChannelUri channelUri = ChannelUri.parse(this.ctx.catchupChannel());
                    channelUri.remove("endpoint");
                    channelUri.put("tags", this.aeron.nextCorrelationId() + "," + this.aeron.nextCorrelationId());
                    channelUri.put("session-id", Integer.toString(this.liveLogRecordingSessionId));
                    this.recordingChannel = channelUri.toString();
                    channelUri.put("endpoint", catchupEndpoint);
                    this.recordingSubscription = this.aeron.addSubscription(channelUri.toString(), this.ctx.logStreamId());
                    this.timeOfLastProgressMs = nowMs;
                    return 1;
                }
                String resolvedEndpoint = this.recordingSubscription.resolvedEndpoint();
                if (null == resolvedEndpoint) {
                    return 0;
                }
                ChannelUri channelUri = ChannelUri.parse(this.ctx.catchupChannel());
                channelUri.put("endpoint", catchupEndpoint);
                channelUri.replaceEndpointWildcardPort(resolvedEndpoint);
                channelUri.put("session-id", Integer.toString(this.liveLogRecordingSessionId));
                this.replayChannel = channelUri.toString();
            } else {
                ChannelUri channelUri = ChannelUri.parse(this.ctx.catchupChannel());
                channelUri.put("endpoint", catchupEndpoint);
                channelUri.put("session-id", Integer.toString(this.liveLogRecordingSessionId));
                this.recordingChannel = this.replayChannel = channelUri.toString();
            }
            this.liveLogRecordingSubscriptionId = this.startLogRecording();
        }
        this.timeOfLastProgressMs = nowMs;
        this.state(ClusterBackup.State.LIVE_LOG_REPLAY, nowMs);
        return ++workCount;
    }

    private int liveLogReplay(long nowMs) {
        int workCount = 0;
        if (-1L == this.liveLogRecordingId) {
            if (null == this.clusterArchive) {
                int step = this.clusterArchiveAsyncConnect.step();
                this.clusterArchive = this.clusterArchiveAsyncConnect.poll();
                return null == this.clusterArchive ? this.clusterArchiveAsyncConnect.step() - step : 1;
            }
            if (-1L == this.correlationId) {
                RecordingLog.Entry logEntry = this.recordingLog.findLastTerm();
                long startPosition = this.replayStartPosition(logEntry);
                long replayId = this.ctx.aeron().nextCorrelationId();
                if (this.clusterArchive.archiveProxy().boundedReplay(this.clusterLogRecordingId, startPosition, -1L, this.leaderCommitPositionCounterId, this.replayChannel, this.ctx.logStreamId(), replayId, this.clusterArchive.controlSessionId())) {
                    this.replayChannel = null;
                    this.correlationId = replayId;
                    this.timeOfLastProgressMs = nowMs;
                    ++workCount;
                }
            } else if (-1L == this.liveLogReplaySessionId) {
                ControlResponsePoller poller = this.clusterArchive.controlResponsePoller();
                if (0 != poller.poll() && poller.isPollComplete() && poller.controlSessionId() == this.clusterArchive.controlSessionId() && poller.correlationId() == this.correlationId) {
                    switch (poller.code()) {
                        case OK: {
                            this.liveLogReplaySessionId = poller.relevantId();
                            this.timeOfLastProgressMs = nowMs;
                            ++workCount;
                            break;
                        }
                        case ERROR: {
                            this.throwReplayFailedException(poller);
                            break;
                        }
                        default: {
                            throw new ClusterException("Live log replay failed: " + String.valueOf((Object)poller.code()));
                        }
                    }
                }
            } else if (-1 == this.liveLogRecordingCounterId) {
                CountersReader countersReader = this.aeron.countersReader();
                this.liveLogRecordingCounterId = RecordingPos.findCounterIdBySession(countersReader, (int)this.liveLogReplaySessionId, this.backupArchive.archiveId());
                if (-1 != this.liveLogRecordingCounterId) {
                    this.liveLogPositionCounter.setRelease(countersReader.getCounterValue(this.liveLogRecordingCounterId));
                    this.liveLogRecordingId = RecordingPos.getRecordingId(countersReader, this.liveLogRecordingCounterId);
                    this.timeOfLastBackupQueryMs = nowMs;
                    this.timeOfLastProgressMs = nowMs;
                    this.state(ClusterBackup.State.UPDATE_RECORDING_LOG, nowMs);
                    ++workCount;
                } else {
                    ControlResponsePoller poller = this.clusterArchive.controlResponsePoller();
                    if (0 != poller.poll() && poller.isPollComplete() && poller.controlSessionId() == this.clusterArchive.controlSessionId() && ControlResponseCode.ERROR == poller.code()) {
                        this.throwReplayFailedException(poller);
                    }
                }
            }
        } else {
            this.timeOfLastProgressMs = nowMs;
            this.state(ClusterBackup.State.UPDATE_RECORDING_LOG, nowMs);
        }
        return workCount;
    }

    private void throwReplayFailedException(ControlResponsePoller poller) {
        throw new ClusterException("Live log replay failed (replaySessionId=" + this.liveLogReplaySessionId + "): errorMessage=" + poller.errorMessage() + ", errorCode=" + ArchiveException.errorCodeAsString((int)poller.relevantId()));
    }

    private int updateRecordingLog(long nowMs) {
        boolean wasRecordingLogUpdated = false;
        try {
            long snapshotLeadershipTermId;
            long l = snapshotLeadershipTermId = this.snapshotsRetrieved.isEmpty() ? -1L : this.snapshotsRetrieved.get((int)0).leadershipTermId;
            if (null != this.leaderLogEntry && this.recordingLog.isUnknown(this.leaderLogEntry.leadershipTermId) && this.leaderLogEntry.leadershipTermId <= snapshotLeadershipTermId) {
                this.recordingLog.appendTerm(this.liveLogRecordingId, this.leaderLogEntry.leadershipTermId, this.leaderLogEntry.termBaseLogPosition, this.leaderLogEntry.timestamp);
                wasRecordingLogUpdated = true;
                this.leaderLogEntry = null;
            }
            if (!this.snapshotsRetrieved.isEmpty()) {
                for (int i = this.snapshotsRetrieved.size() - 1; i >= 0; --i) {
                    RecordingLog.Snapshot snapshot = this.snapshotsRetrieved.get(i);
                    this.recordingLog.appendSnapshot(snapshot.recordingId, snapshot.leadershipTermId, snapshot.termBaseLogPosition, snapshot.logPosition, snapshot.timestamp, snapshot.serviceId);
                }
                wasRecordingLogUpdated = true;
            }
            if (null != this.leaderLastTermEntry && this.recordingLog.isUnknown(this.leaderLastTermEntry.leadershipTermId)) {
                this.recordingLog.appendTerm(this.liveLogRecordingId, this.leaderLastTermEntry.leadershipTermId, this.leaderLastTermEntry.termBaseLogPosition, this.leaderLastTermEntry.timestamp);
                wasRecordingLogUpdated = true;
                this.leaderLastTermEntry = null;
            }
        }
        catch (Exception ex) {
            this.ctx.countedErrorHandler().onError(ex);
            throw new AgentTerminationException("failed to update recording log");
        }
        if (wasRecordingLogUpdated) {
            this.recordingLog.force(2);
            if (!this.snapshotsRetrieved.isEmpty()) {
                this.ctx.snapshotRetrieveCounter().incrementRelease();
            }
            if (null != this.eventsListener) {
                this.eventsListener.onUpdatedRecordingLog(this.recordingLog, this.snapshotsRetrieved);
            }
        }
        this.snapshotsRetrieved.clear();
        this.snapshotsToRetrieve.clear();
        this.timeOfLastProgressMs = nowMs;
        this.nextQueryDeadlineMsCounter.setRelease(nowMs + this.backupQueryIntervalMs);
        this.state(ClusterBackup.State.BACKING_UP, nowMs);
        return 1;
    }

    private int backingUp(long nowMs) {
        long liveLogPosition;
        int workCount = 0;
        if (nowMs > this.nextQueryDeadlineMsCounter.get()) {
            this.timeOfLastBackupQueryMs = nowMs;
            this.timeOfLastProgressMs = nowMs;
            this.state(ClusterBackup.State.BACKUP_QUERY, nowMs);
            ++workCount;
        }
        if (-1 != this.liveLogRecordingCounterId && this.liveLogPositionCounter.proposeMaxRelease(liveLogPosition = this.aeron.countersReader().getCounterValue(this.liveLogRecordingCounterId))) {
            if (null != this.eventsListener) {
                this.eventsListener.onLiveLogProgress(this.liveLogRecordingId, this.liveLogRecordingCounterId, liveLogPosition);
            }
            ++workCount;
        }
        return workCount;
    }

    private void state(ClusterBackup.State newState, long nowMs) {
        this.logStateChange(this.state, newState, nowMs);
        if (ClusterBackup.State.BACKUP_QUERY == newState && null != this.eventsListener) {
            this.eventsListener.onBackupQuery();
        }
        if (!this.stateCounter.isClosed()) {
            this.stateCounter.setRelease(newState.code());
        }
        this.state = newState;
        this.correlationId = -1L;
    }

    private void logStateChange(ClusterBackup.State oldState, ClusterBackup.State newState, long nowMs) {
    }

    private int pollBackupArchiveEvents() {
        int workCount = 0;
        if (null != this.backupArchive) {
            RecordingSignalPoller poller = this.recordingSignalPoller;
            workCount += poller.poll();
            if (poller.isPollComplete()) {
                int templateId = poller.templateId();
                if (1 == templateId && poller.code() == ControlResponseCode.ERROR) {
                    ArchiveException ex = new ArchiveException(poller.errorMessage(), (int)poller.relevantId(), poller.correlationId());
                    if (ex.errorCode() == 11) {
                        this.ctx.countedErrorHandler().onError(ex);
                        throw new AgentTerminationException();
                    }
                    throw ex;
                }
                if (24 == templateId && null != this.snapshotReplication) {
                    this.snapshotReplication.onSignal(poller.correlationId(), poller.recordingId(), poller.recordingPosition(), poller.recordingSignal());
                }
            } else if (0 == workCount && !poller.subscription().isConnected()) {
                this.ctx.countedErrorHandler().onError(new ClusterEvent("local archive not connected"));
                throw new AgentTerminationException();
            }
        }
        return workCount;
    }

    private long startLogRecording() {
        RecordingLog.Entry logEntry = this.recordingLog.findLastTerm();
        int streamId = this.ctx.logStreamId();
        long recordingSubscriptionId = null == logEntry ? this.backupArchive.startRecording(this.recordingChannel, streamId, SourceLocation.REMOTE, true) : this.backupArchive.extendRecording(logEntry.recordingId, this.recordingChannel, streamId, SourceLocation.REMOTE, true);
        CloseHelper.close(this.ctx.countedErrorHandler(), this.recordingSubscription);
        this.recordingChannel = null;
        this.recordingSubscription = null;
        return recordingSubscriptionId;
    }

    private boolean hasProgressStalled(long nowMs) {
        return -1 == this.liveLogRecordingCounterId && nowMs > this.timeOfLastProgressMs + this.backupProgressTimeoutMs;
    }

    private long replayStartPosition(RecordingLog.Entry lastTerm) {
        return ClusterBackupAgent.replayStartPosition(lastTerm, this.snapshotsRetrieved, this.ctx.initialReplayStart(), this.backupArchive);
    }

    static long replayStartPosition(RecordingLog.Entry lastTerm, List<RecordingLog.Snapshot> snapshotsRetrieved, ClusterBackup.Configuration.ReplayStart replayStart, AeronArchive backupArchive) {
        if (null != lastTerm) {
            return backupArchive.getStopPosition(lastTerm.recordingId);
        }
        if (ClusterBackup.Configuration.ReplayStart.BEGINNING == replayStart) {
            return -1L;
        }
        long replayStartPosition = -1L;
        for (RecordingLog.Snapshot snapshot : snapshotsRetrieved) {
            if (-1 != snapshot.serviceId || replayStartPosition >= snapshot.logPosition) continue;
            replayStartPosition = snapshot.logPosition;
        }
        return replayStartPosition;
    }

    private void runTerminationHook(AgentTerminationException ex) {
        try {
            this.ctx.terminationHook().run();
        }
        catch (Exception e) {
            this.ctx.countedErrorHandler().onError(e);
        }
        throw ex;
    }
}

