/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.FragmentAssembler;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ControlResponsePoller;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.ClusterBackup;
import io.aeron.cluster.ClusterBackupEventsListener;
import io.aeron.cluster.ClusterMember;
import io.aeron.cluster.ConsensusPublisher;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.SnapshotRetrieveMonitor;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.BackupResponseDecoder;
import io.aeron.cluster.codecs.MessageHeaderDecoder;
import io.aeron.cluster.service.ClusterMarkFile;
import io.aeron.exceptions.AeronException;
import io.aeron.exceptions.TimeoutException;
import io.aeron.logbuffer.Header;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.LongArrayList;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.status.CountersReader;

public class ClusterBackupAgent
implements Agent {
    private static final int SLOW_TICK_INTERVAL_MS = 10;
    private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    private final BackupResponseDecoder backupResponseDecoder = new BackupResponseDecoder();
    private final ClusterBackup.Context ctx;
    private final ClusterMarkFile markFile;
    private final AgentInvoker aeronClientInvoker;
    private final EpochClock epochClock;
    private final Aeron aeron;
    private final String[] clusterConsensusEndpoints;
    private final ConsensusPublisher consensusPublisher = new ConsensusPublisher();
    private final ArrayList<RecordingLog.Snapshot> snapshotsToRetrieve = new ArrayList(4);
    private final ArrayList<RecordingLog.Snapshot> snapshotsRetrieved = new ArrayList(4);
    private final LongArrayList snapshotLengths = new LongArrayList();
    private final Counter stateCounter;
    private final Counter liveLogPositionCounter;
    private final Counter nextQueryDeadlineMsCounter;
    private final ClusterBackupEventsListener eventsListener;
    private final long backupResponseTimeoutMs;
    private final long backupQueryIntervalMs;
    private final long backupProgressTimeoutMs;
    private final long coolDownIntervalMs;
    private final long unavailableCounterHandlerRegistrationId;
    private ClusterBackup.State state = ClusterBackup.State.INIT;
    private AeronArchive backupArchive;
    private AeronArchive.AsyncConnect clusterArchiveAsyncConnect;
    private AeronArchive clusterArchive;
    private SnapshotRetrieveMonitor snapshotRetrieveMonitor;
    private final FragmentAssembler consensusFragmentAssembler = new FragmentAssembler(this::onFragment);
    private final Subscription consensusSubscription;
    private ExclusivePublication consensusPublication;
    private ClusterMember[] clusterMembers;
    private ClusterMember leaderMember;
    private RecordingLog recordingLog;
    private RecordingLog.Entry leaderLogEntry;
    private RecordingLog.Entry leaderLastTermEntry;
    private long slowTickDeadlineMs = 0L;
    private long timeOfLastBackupQueryMs = 0L;
    private long timeOfLastProgressMs = 0L;
    private long coolDownDeadlineMs = -1L;
    private long correlationId = -1L;
    private long leaderLogRecordingId = -1L;
    private long liveLogRecordingSubscriptionId = -1L;
    private long liveLogRecordingId = -1L;
    private long liveLogReplayId = -1L;
    private int leaderCommitPositionCounterId = -1;
    private int clusterConsensusEndpointsCursor = -1;
    private int snapshotCursor = 0;
    private int liveLogReplaySessionId = -1;
    private int liveLogRecCounterId = -1;

    ClusterBackupAgent(ClusterBackup.Context ctx) {
        this.ctx = ctx;
        this.aeron = ctx.aeron();
        this.epochClock = ctx.epochClock();
        this.backupResponseTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.clusterBackupResponseTimeoutNs());
        this.backupQueryIntervalMs = TimeUnit.NANOSECONDS.toMillis(ctx.clusterBackupIntervalNs());
        this.backupProgressTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.clusterBackupProgressTimeoutNs());
        this.coolDownIntervalMs = TimeUnit.NANOSECONDS.toMillis(ctx.clusterBackupCoolDownIntervalNs());
        this.markFile = ctx.clusterMarkFile();
        this.eventsListener = ctx.eventsListener();
        this.clusterConsensusEndpoints = ctx.clusterConsensusEndpoints().split(",");
        this.aeronClientInvoker = this.aeron.conductorAgentInvoker();
        this.aeronClientInvoker.invoke();
        this.unavailableCounterHandlerRegistrationId = this.aeron.addUnavailableCounterHandler(this::onUnavailableCounter);
        this.consensusSubscription = this.aeron.addSubscription(ctx.consensusChannel(), ctx.consensusStreamId());
        this.stateCounter = ctx.stateCounter();
        this.liveLogPositionCounter = ctx.liveLogPositionCounter();
        this.nextQueryDeadlineMsCounter = ctx.nextQueryDeadlineMsCounter();
    }

    @Override
    public void onStart() {
        this.recordingLog = new RecordingLog(this.ctx.clusterDir());
        this.backupArchive = AeronArchive.connect(this.ctx.archiveContext().clone());
        this.stateCounter.setOrdered(ClusterBackup.State.INIT.code());
        this.nextQueryDeadlineMsCounter.setOrdered(this.epochClock.time() - 1L);
    }

    @Override
    public void onClose() {
        this.aeron.removeUnavailableCounterHandler(this.unavailableCounterHandlerRegistrationId);
        if (!this.ctx.ownsAeronClient()) {
            CloseHelper.closeAll(this.consensusSubscription, this.consensusPublication);
        }
        if (-1L != this.liveLogRecordingSubscriptionId) {
            this.backupArchive.tryStopRecording(this.liveLogRecordingSubscriptionId);
        }
        CloseHelper.closeAll(this.backupArchive, this.clusterArchiveAsyncConnect, this.clusterArchive, this.recordingLog);
        this.ctx.close();
    }

    @Override
    public int doWork() {
        int workCount;
        long nowMs = this.epochClock.time();
        int n = workCount = ClusterBackup.State.INIT == this.state ? this.init(nowMs) : 0;
        if (nowMs > this.slowTickDeadlineMs) {
            workCount += this.slowTick(nowMs);
        }
        try {
            workCount += this.consensusSubscription.poll(this.consensusFragmentAssembler, 10);
            switch (this.state) {
                case BACKUP_QUERY: {
                    workCount += this.backupQuery(nowMs);
                    break;
                }
                case SNAPSHOT_LENGTH_RETRIEVE: {
                    workCount += this.snapshotLengthRetrieve(nowMs);
                    break;
                }
                case SNAPSHOT_RETRIEVE: {
                    workCount += this.snapshotRetrieve(nowMs);
                    break;
                }
                case LIVE_LOG_REPLAY: {
                    workCount += this.liveLogReplay(nowMs);
                    break;
                }
                case UPDATE_RECORDING_LOG: {
                    workCount += this.updateRecordingLog(nowMs);
                    break;
                }
                case RESET_BACKUP: {
                    workCount += this.resetBackup(nowMs);
                    break;
                }
                case BACKING_UP: {
                    workCount += this.backingUp(nowMs);
                }
            }
            if (this.hasProgressStalled(nowMs)) {
                if (null != this.eventsListener) {
                    this.eventsListener.onPossibleFailure(new TimeoutException("progress has stalled", AeronException.Category.WARN));
                }
                this.state(ClusterBackup.State.RESET_BACKUP, nowMs);
            }
        }
        catch (Exception ex) {
            if (null != this.eventsListener) {
                this.eventsListener.onPossibleFailure(ex);
            }
            this.state(ClusterBackup.State.RESET_BACKUP, nowMs);
            throw ex;
        }
        return workCount;
    }

    @Override
    public String roleName() {
        return "cluster-backup";
    }

    private void reset() {
        this.clusterMembers = null;
        this.leaderMember = null;
        this.snapshotsToRetrieve.clear();
        this.snapshotsRetrieved.clear();
        this.snapshotLengths.clear();
        this.leaderLogEntry = null;
        this.leaderLastTermEntry = null;
        this.clusterConsensusEndpointsCursor = -1;
        this.consensusFragmentAssembler.clear();
        ExclusivePublication consensusPublication = this.consensusPublication;
        AeronArchive clusterArchive = this.clusterArchive;
        AeronArchive.AsyncConnect clusterArchiveAsyncConnect = this.clusterArchiveAsyncConnect;
        this.consensusPublication = null;
        this.clusterArchive = null;
        this.clusterArchiveAsyncConnect = null;
        CloseHelper.closeAll(consensusPublication, clusterArchive, clusterArchiveAsyncConnect);
        if (-1L != this.liveLogRecordingSubscriptionId) {
            this.backupArchive.tryStopRecording(this.liveLogRecordingSubscriptionId);
        }
        this.correlationId = -1L;
        this.liveLogRecCounterId = -1;
        this.liveLogRecordingId = -1L;
        this.liveLogReplayId = -1L;
        this.liveLogRecordingSubscriptionId = -1L;
    }

    private void onFragment(DirectBuffer buffer, int offset, int length, Header header) {
        this.messageHeaderDecoder.wrap(buffer, offset);
        int schemaId = this.messageHeaderDecoder.schemaId();
        if (schemaId != 111) {
            throw new ClusterException("expected schemaId=111, actual=" + schemaId);
        }
        if (this.messageHeaderDecoder.templateId() == 78) {
            this.backupResponseDecoder.wrap(buffer, offset + 8, this.messageHeaderDecoder.blockLength(), this.messageHeaderDecoder.version());
            this.onBackupResponse(this.backupResponseDecoder.correlationId(), this.backupResponseDecoder.logRecordingId(), this.backupResponseDecoder.logLeadershipTermId(), this.backupResponseDecoder.logTermBaseLogPosition(), this.backupResponseDecoder.lastLeadershipTermId(), this.backupResponseDecoder.lastTermBaseLogPosition(), this.backupResponseDecoder.commitPositionCounterId(), this.backupResponseDecoder.leaderMemberId(), this.backupResponseDecoder);
        }
    }

    private void onUnavailableCounter(CountersReader counters, long registrationId, int counterId) {
        if (counterId == this.liveLogRecCounterId) {
            if (null != this.eventsListener) {
                this.eventsListener.onPossibleFailure(new ClusterException("log recording counter became unavailable", AeronException.Category.WARN));
            }
            this.state(ClusterBackup.State.RESET_BACKUP, this.epochClock.time());
        }
    }

    private void onBackupResponse(long correlationId, long logRecordingId, long logLeadershipTermId, long logTermBaseLogPosition, long lastLeadershipTermId, long lastTermBaseLogPosition, int commitPositionCounterId, int leaderMemberId, BackupResponseDecoder backupResponseDecoder) {
        if (ClusterBackup.State.BACKUP_QUERY == this.state && correlationId == this.correlationId) {
            long nowMs;
            RecordingLog.Entry lastTerm;
            BackupResponseDecoder.SnapshotsDecoder snapshotsDecoder = backupResponseDecoder.snapshots();
            if (snapshotsDecoder.count() > 0) {
                for (BackupResponseDecoder.SnapshotsDecoder snapshot : snapshotsDecoder) {
                    RecordingLog.Entry entry = this.recordingLog.getLatestSnapshot(snapshot.serviceId());
                    if (null != entry && snapshot.logPosition() == entry.logPosition) continue;
                    this.snapshotsToRetrieve.add(new RecordingLog.Snapshot(snapshot.recordingId(), snapshot.leadershipTermId(), snapshot.termBaseLogPosition(), snapshot.logPosition(), snapshot.timestamp(), snapshot.serviceId()));
                }
            }
            if (null == this.leaderMember || this.leaderMember.id() != leaderMemberId || logRecordingId != this.leaderLogRecordingId) {
                this.leaderLogRecordingId = logRecordingId;
                this.leaderLogEntry = new RecordingLog.Entry(logRecordingId, logLeadershipTermId, logTermBaseLogPosition, -1L, -1L, -1, 0, true, -1);
            }
            if (null == (lastTerm = this.recordingLog.findLastTerm()) || lastLeadershipTermId != lastTerm.leadershipTermId || lastTermBaseLogPosition != lastTerm.termBaseLogPosition) {
                this.leaderLastTermEntry = new RecordingLog.Entry(logRecordingId, lastLeadershipTermId, lastTermBaseLogPosition, -1L, -1L, -1, 0, true, -1);
            }
            this.timeOfLastBackupQueryMs = 0L;
            this.snapshotCursor = 0;
            this.correlationId = -1L;
            this.leaderCommitPositionCounterId = commitPositionCounterId;
            this.clusterMembers = ClusterMember.parse(backupResponseDecoder.clusterMembers());
            this.leaderMember = ClusterMember.findMember(this.clusterMembers, leaderMemberId);
            if (null != this.eventsListener) {
                this.eventsListener.onBackupResponse(this.clusterMembers, this.leaderMember, this.snapshotsToRetrieve);
            }
            if (null == this.clusterArchive) {
                ChannelUri leaderArchiveUri = ChannelUri.parse(this.ctx.archiveContext().controlRequestChannel());
                leaderArchiveUri.put("endpoint", this.leaderMember.archiveEndpoint());
                AeronArchive.Context leaderArchiveCtx = new AeronArchive.Context().aeron(this.ctx.aeron()).controlRequestChannel(leaderArchiveUri.toString()).controlRequestStreamId(this.ctx.archiveContext().controlRequestStreamId()).controlResponseChannel(this.ctx.archiveContext().controlResponseChannel()).controlResponseStreamId(this.ctx.archiveContext().controlResponseStreamId());
                CloseHelper.close(this.clusterArchiveAsyncConnect);
                this.clusterArchiveAsyncConnect = AeronArchive.asyncConnect(leaderArchiveCtx);
            }
            this.timeOfLastProgressMs = nowMs = this.epochClock.time();
            if (this.snapshotsToRetrieve.isEmpty()) {
                this.state(ClusterBackup.State.LIVE_LOG_REPLAY, nowMs);
            } else {
                this.state(ClusterBackup.State.SNAPSHOT_LENGTH_RETRIEVE, nowMs);
            }
        }
    }

    private int slowTick(long nowMs) {
        int workCount = this.aeronClientInvoker.invoke();
        this.slowTickDeadlineMs = nowMs + 10L;
        this.markFile.updateActivityTimestamp(nowMs);
        if (-1L == this.correlationId && null == this.snapshotRetrieveMonitor) {
            this.backupArchive.checkForErrorResponse();
            if (null != this.clusterArchive) {
                this.clusterArchive.pollForErrorResponse();
            }
        }
        return workCount;
    }

    private int init(long nowMs) {
        this.timeOfLastProgressMs = nowMs;
        this.state(ClusterBackup.State.BACKUP_QUERY, nowMs);
        return 1;
    }

    private int resetBackup(long nowMs) {
        this.timeOfLastProgressMs = nowMs;
        if (-1L == this.coolDownDeadlineMs) {
            this.coolDownDeadlineMs = nowMs + this.coolDownIntervalMs;
            this.reset();
            return 1;
        }
        if (nowMs > this.coolDownDeadlineMs) {
            this.coolDownDeadlineMs = -1L;
            this.state(ClusterBackup.State.INIT, nowMs);
            return 1;
        }
        return 0;
    }

    private int backupQuery(long nowMs) {
        long correlationId;
        if (null == this.consensusPublication || nowMs > this.timeOfLastBackupQueryMs + this.backupResponseTimeoutMs) {
            int cursor;
            if ((cursor = ++this.clusterConsensusEndpointsCursor) >= this.clusterConsensusEndpoints.length) {
                this.clusterConsensusEndpointsCursor = 0;
                cursor = 0;
            }
            CloseHelper.close(this.clusterArchiveAsyncConnect);
            this.clusterArchiveAsyncConnect = null;
            CloseHelper.close(this.clusterArchive);
            this.clusterArchive = null;
            CloseHelper.close(this.consensusPublication);
            ChannelUri uri = ChannelUri.parse(this.ctx.consensusChannel());
            uri.put("endpoint", this.clusterConsensusEndpoints[cursor]);
            this.consensusPublication = this.aeron.addExclusivePublication(uri.toString(), this.ctx.consensusStreamId());
            this.correlationId = -1L;
            this.timeOfLastBackupQueryMs = nowMs;
            return 1;
        }
        if (-1L == this.correlationId && this.consensusPublication.isConnected() && this.consensusPublisher.backupQuery(this.consensusPublication, correlationId = this.aeron.nextCorrelationId(), this.ctx.consensusStreamId(), AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION, this.ctx.consensusChannel(), ArrayUtil.EMPTY_BYTE_ARRAY)) {
            this.timeOfLastBackupQueryMs = nowMs;
            this.correlationId = correlationId;
            return 1;
        }
        return 0;
    }

    private int snapshotLengthRetrieve(long nowMs) {
        int workCount = 0;
        if (null == this.clusterArchive) {
            this.clusterArchive = this.clusterArchiveAsyncConnect.poll();
            return null == this.clusterArchive ? 0 : 1;
        }
        if (-1L == this.correlationId) {
            long stopPositionCorrelationId = this.ctx.aeron().nextCorrelationId();
            RecordingLog.Snapshot snapshot = this.snapshotsToRetrieve.get(this.snapshotCursor);
            if (this.clusterArchive.archiveProxy().getStopPosition(snapshot.recordingId, stopPositionCorrelationId, this.clusterArchive.controlSessionId())) {
                this.correlationId = stopPositionCorrelationId;
                this.timeOfLastProgressMs = nowMs;
                ++workCount;
            }
        } else if (ClusterBackupAgent.pollForResponse(this.clusterArchive, this.correlationId)) {
            long snapshotStopPosition = this.clusterArchive.controlResponsePoller().relevantId();
            this.correlationId = -1L;
            if (-1L == snapshotStopPosition) {
                this.state(ClusterBackup.State.RESET_BACKUP, nowMs);
            }
            this.snapshotLengths.addLong(this.snapshotCursor, snapshotStopPosition);
            if (++this.snapshotCursor >= this.snapshotsToRetrieve.size()) {
                this.snapshotCursor = 0;
                this.state(ClusterBackup.State.SNAPSHOT_RETRIEVE, nowMs);
            }
            this.timeOfLastProgressMs = nowMs;
            ++workCount;
        }
        return workCount;
    }

    private int snapshotRetrieve(long nowMs) {
        int workCount = 0;
        if (null == this.clusterArchive) {
            this.clusterArchive = this.clusterArchiveAsyncConnect.poll();
            return null == this.clusterArchive ? 0 : 1;
        }
        if (null != this.snapshotRetrieveMonitor) {
            workCount += this.snapshotRetrieveMonitor.poll();
            this.timeOfLastProgressMs = nowMs;
            if (this.snapshotRetrieveMonitor.isDone()) {
                RecordingLog.Snapshot snapshot = this.snapshotsToRetrieve.get(this.snapshotCursor);
                this.snapshotsRetrieved.add(new RecordingLog.Snapshot(this.snapshotRetrieveMonitor.recordingId(), snapshot.leadershipTermId, snapshot.termBaseLogPosition, snapshot.logPosition, snapshot.timestamp, snapshot.serviceId));
                this.snapshotRetrieveMonitor = null;
                this.correlationId = -1L;
                if (++this.snapshotCursor >= this.snapshotsToRetrieve.size()) {
                    this.state(ClusterBackup.State.LIVE_LOG_REPLAY, nowMs);
                    ++workCount;
                }
            }
        } else if (-1L == this.correlationId) {
            int streamId;
            long replayCorrelationId = this.ctx.aeron().nextCorrelationId();
            String channel = "aeron:udp?endpoint=" + this.ctx.catchupEndpoint();
            RecordingLog.Snapshot snapshot = this.snapshotsToRetrieve.get(this.snapshotCursor);
            int n = streamId = snapshot.serviceId == -1 ? this.ctx.consensusModuleSnapshotStreamId() : this.ctx.serviceSnapshotStreamId();
            if (this.clusterArchive.archiveProxy().replay(snapshot.recordingId, 0L, -1L, channel, streamId, replayCorrelationId, this.clusterArchive.controlSessionId())) {
                this.correlationId = replayCorrelationId;
                this.timeOfLastProgressMs = nowMs;
                ++workCount;
            }
        } else if (ClusterBackupAgent.pollForResponse(this.clusterArchive, this.correlationId)) {
            this.snapshotRetrieveMonitor = new SnapshotRetrieveMonitor(this.backupArchive, this.snapshotLengths.get(this.snapshotCursor));
            int replaySessionId = (int)this.clusterArchive.controlResponsePoller().relevantId();
            String channel = "aeron:udp?endpoint=" + this.ctx.catchupEndpoint() + "|session-id=" + replaySessionId;
            RecordingLog.Snapshot snapshot = this.snapshotsToRetrieve.get(this.snapshotCursor);
            int streamId = snapshot.serviceId == -1 ? this.ctx.consensusModuleSnapshotStreamId() : this.ctx.serviceSnapshotStreamId();
            this.backupArchive.archiveProxy().startRecording(channel, streamId, SourceLocation.REMOTE, true, this.backupArchive.context().aeron().nextCorrelationId(), this.backupArchive.controlSessionId());
            this.timeOfLastProgressMs = nowMs;
            ++workCount;
        }
        return workCount;
    }

    private int liveLogReplay(long nowMs) {
        int workCount = 0;
        if (-1L == this.liveLogRecordingId) {
            if (null == this.clusterArchive) {
                this.clusterArchive = this.clusterArchiveAsyncConnect.poll();
                return null == this.clusterArchive ? 0 : 1;
            }
            if (-1L == this.correlationId) {
                long startPosition;
                RecordingLog.Entry logEntry = this.recordingLog.findLastTerm();
                String catchupChannel = "aeron:udp?endpoint=" + this.ctx.catchupEndpoint();
                long replayId = this.ctx.aeron().nextCorrelationId();
                long l = startPosition = null == logEntry ? -1L : this.backupArchive.getStopPosition(logEntry.recordingId);
                if (this.clusterArchive.archiveProxy().boundedReplay(this.leaderLogRecordingId, startPosition, -1L, this.leaderCommitPositionCounterId, catchupChannel, this.ctx.logStreamId(), replayId, this.clusterArchive.controlSessionId())) {
                    this.correlationId = replayId;
                    this.timeOfLastProgressMs = nowMs;
                    ++workCount;
                }
            } else if (-1L != this.liveLogRecordingSubscriptionId && -1 == this.liveLogRecCounterId) {
                CountersReader countersReader = this.aeron.countersReader();
                this.liveLogRecCounterId = RecordingPos.findCounterIdBySession(countersReader, this.liveLogReplaySessionId);
                if (this.liveLogRecCounterId != -1) {
                    this.liveLogPositionCounter.setOrdered(countersReader.getCounterValue(this.liveLogRecCounterId));
                    this.liveLogRecordingId = RecordingPos.getRecordingId(countersReader, this.liveLogRecCounterId);
                    this.timeOfLastBackupQueryMs = nowMs;
                    this.timeOfLastProgressMs = nowMs;
                    this.state(ClusterBackup.State.UPDATE_RECORDING_LOG, nowMs);
                }
            } else if (ClusterBackupAgent.pollForResponse(this.clusterArchive, this.correlationId)) {
                this.liveLogReplayId = this.clusterArchive.controlResponsePoller().relevantId();
                this.liveLogReplaySessionId = (int)this.liveLogReplayId;
                RecordingLog.Entry logEntry = this.recordingLog.findLastTerm();
                String catchupChannel = "aeron:udp?endpoint=" + this.ctx.catchupEndpoint() + "|session-id=" + this.liveLogReplaySessionId;
                this.timeOfLastProgressMs = nowMs;
                this.liveLogRecordingSubscriptionId = null == logEntry ? this.backupArchive.startRecording(catchupChannel, this.ctx.logStreamId(), SourceLocation.REMOTE, true) : this.backupArchive.extendRecording(logEntry.recordingId, catchupChannel, this.ctx.logStreamId(), SourceLocation.REMOTE, true);
            }
        } else {
            this.timeOfLastProgressMs = nowMs;
            this.state(ClusterBackup.State.UPDATE_RECORDING_LOG, nowMs);
        }
        return workCount;
    }

    private int updateRecordingLog(long nowMs) {
        long snapshotLeadershipTermId;
        boolean wasRecordingLogUpdated = false;
        long l = snapshotLeadershipTermId = this.snapshotsRetrieved.isEmpty() ? -1L : this.snapshotsRetrieved.get((int)0).leadershipTermId;
        if (null != this.leaderLogEntry && this.recordingLog.isUnknown(this.leaderLogEntry.leadershipTermId) && this.leaderLogEntry.leadershipTermId <= snapshotLeadershipTermId) {
            this.recordingLog.appendTerm(this.liveLogRecordingId, this.leaderLogEntry.leadershipTermId, this.leaderLogEntry.termBaseLogPosition, this.leaderLogEntry.timestamp);
            wasRecordingLogUpdated = true;
            this.leaderLogEntry = null;
        }
        if (!this.snapshotsRetrieved.isEmpty()) {
            for (int i = this.snapshotsRetrieved.size() - 1; i >= 0; --i) {
                RecordingLog.Snapshot snapshot = this.snapshotsRetrieved.get(i);
                this.recordingLog.appendSnapshot(snapshot.recordingId, snapshot.leadershipTermId, snapshot.termBaseLogPosition, snapshot.logPosition, snapshot.timestamp, snapshot.serviceId);
            }
            wasRecordingLogUpdated = true;
        }
        if (null != this.leaderLastTermEntry && this.recordingLog.isUnknown(this.leaderLastTermEntry.leadershipTermId)) {
            this.recordingLog.appendTerm(this.liveLogRecordingId, this.leaderLastTermEntry.leadershipTermId, this.leaderLastTermEntry.termBaseLogPosition, this.leaderLastTermEntry.timestamp);
            wasRecordingLogUpdated = true;
            this.leaderLastTermEntry = null;
        }
        if (wasRecordingLogUpdated && null != this.eventsListener) {
            this.eventsListener.onUpdatedRecordingLog(this.recordingLog, this.snapshotsRetrieved);
        }
        this.snapshotsRetrieved.clear();
        this.snapshotsToRetrieve.clear();
        this.snapshotLengths.clear();
        this.timeOfLastProgressMs = nowMs;
        this.nextQueryDeadlineMsCounter.setOrdered(nowMs + this.backupQueryIntervalMs);
        this.state(ClusterBackup.State.BACKING_UP, nowMs);
        return 1;
    }

    private int backingUp(long nowMs) {
        long liveLogPosition;
        int workCount = 0;
        if (nowMs > this.nextQueryDeadlineMsCounter.get()) {
            this.timeOfLastBackupQueryMs = nowMs;
            this.timeOfLastProgressMs = nowMs;
            this.state(ClusterBackup.State.BACKUP_QUERY, nowMs);
            ++workCount;
        }
        if (-1 != this.liveLogRecCounterId && this.liveLogPositionCounter.proposeMaxOrdered(liveLogPosition = this.aeron.countersReader().getCounterValue(this.liveLogRecCounterId))) {
            if (null != this.eventsListener) {
                this.eventsListener.onLiveLogProgress(this.liveLogRecordingId, this.liveLogRecCounterId, liveLogPosition);
            }
            ++workCount;
        }
        return workCount;
    }

    private void state(ClusterBackup.State newState, long nowMs) {
        this.stateChange(this.state, newState, nowMs);
        if (ClusterBackup.State.BACKUP_QUERY == newState && null != this.eventsListener) {
            this.eventsListener.onBackupQuery();
        }
        this.stateCounter.setOrdered(newState.code());
        this.state = newState;
    }

    private void stateChange(ClusterBackup.State oldState, ClusterBackup.State newState, long nowMs) {
    }

    private static boolean pollForResponse(AeronArchive archive, long correlationId) {
        ControlResponsePoller poller = archive.controlResponsePoller();
        if (poller.poll() > 0 && poller.isPollComplete() && poller.controlSessionId() == archive.controlSessionId() && poller.correlationId() == correlationId) {
            if (poller.code() == ControlResponseCode.ERROR) {
                throw new ClusterException("archive response for correlationId=" + correlationId + ", error: " + poller.errorMessage());
            }
            return true;
        }
        return false;
    }

    private boolean hasProgressStalled(long nowMs) {
        return -1 == this.liveLogRecCounterId && nowMs > this.timeOfLastProgressMs + this.backupProgressTimeoutMs;
    }
}

