/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.Aeron;
import io.aeron.AvailableImageHandler;
import io.aeron.ChannelUri;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.UnavailableCounterHandler;
import io.aeron.UnavailableImageHandler;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchiveMarkFile;
import io.aeron.archive.Catalog;
import io.aeron.archive.ControlRequestDecoders;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSession;
import io.aeron.archive.ControlSessionAdapter;
import io.aeron.archive.ControlSessionCounter;
import io.aeron.archive.ControlSessionProxy;
import io.aeron.archive.CreateReplayPublicationSession;
import io.aeron.archive.DeleteSegmentsSession;
import io.aeron.archive.ListRecordingByIdSession;
import io.aeron.archive.ListRecordingSubscriptionsSession;
import io.aeron.archive.ListRecordingsForUriSession;
import io.aeron.archive.ListRecordingsSession;
import io.aeron.archive.RecordingEventsProxy;
import io.aeron.archive.RecordingSession;
import io.aeron.archive.RecordingSummary;
import io.aeron.archive.ReplaySession;
import io.aeron.archive.ReplicationCredentialsSupplier;
import io.aeron.archive.ReplicationSession;
import io.aeron.archive.Session;
import io.aeron.archive.SessionWorker;
import io.aeron.archive.UpdateChannelSession;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveEvent;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.codecs.RecordingDescriptorDecoder;
import io.aeron.archive.codecs.RecordingSignal;
import io.aeron.archive.codecs.RecordingState;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.exceptions.AeronException;
import io.aeron.exceptions.TimeoutException;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.security.Authenticator;
import io.aeron.security.AuthorisationService;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayDeque;
import java.util.EnumSet;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.agrona.AsciiEncoding;
import org.agrona.CloseHelper;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.LangUtil;
import org.agrona.SemanticVersion;
import org.agrona.Strings;
import org.agrona.SystemUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongCounterMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.MutableLong;
import org.agrona.collections.Object2ObjectHashMap;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.CountersReader;

abstract class ArchiveConductor
extends SessionWorker<Session>
implements UnavailableImageHandler,
UnavailableCounterHandler {
    private static final EnumSet<StandardOpenOption> FILE_OPTIONS = EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE);
    static final String DELETE_SUFFIX = ".del";
    private final long closeHandlerRegistrationId;
    private final long unavailableCounterHandlerRegistrationId;
    private final long connectTimeoutMs;
    private final long sessionLivenessCheckIntervalMs;
    private long nextSessionId = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
    private long markFileUpdateDeadlineMs = 0L;
    private int replayId = 1;
    private volatile boolean isAbort;
    private final RecordingSummary recordingSummary = new RecordingSummary();
    private final ControlRequestDecoders decoders = new ControlRequestDecoders();
    private final ArrayDeque<Runnable> taskQueue = new ArrayDeque();
    private final Long2ObjectHashMap<ReplaySession> replaySessionByIdMap = new Long2ObjectHashMap();
    private final Long2ObjectHashMap<RecordingSession> recordingSessionByIdMap = new Long2ObjectHashMap();
    private final Long2ObjectHashMap<ReplicationSession> replicationSessionByIdMap = new Long2ObjectHashMap();
    private final Long2ObjectHashMap<DeleteSegmentsSession> deleteSegmentsSessionByIdMap = new Long2ObjectHashMap();
    private final Int2ObjectHashMap<Counter> counterByIdMap = new Int2ObjectHashMap();
    private final Object2ObjectHashMap<String, Subscription> recordingSubscriptionByKeyMap = new Object2ObjectHashMap();
    private final Long2ObjectHashMap<SessionForReplay> controlSessionByReplayToken = new Long2ObjectHashMap();
    private final UnsafeBuffer descriptorBuffer = new UnsafeBuffer();
    private final RecordingDescriptorDecoder recordingDescriptorDecoder = new RecordingDescriptorDecoder();
    private final UnsafeBuffer counterMetadataBuffer = new UnsafeBuffer(new byte[512]);
    private final Long2LongCounterMap subscriptionRefCountMap = new Long2LongCounterMap(0L);
    private final Aeron aeron;
    private final AgentInvoker aeronAgentInvoker;
    private final AgentInvoker driverAgentInvoker;
    private final EpochClock epochClock;
    private final NanoClock nanoClock;
    private final CachedEpochClock cachedEpochClock = new CachedEpochClock();
    private final File archiveDir;
    private final Subscription controlSubscription;
    private final Subscription localControlSubscription;
    private final Catalog catalog;
    private final ArchiveMarkFile markFile;
    private final RecordingEventsProxy recordingEventsProxy;
    private final Authenticator authenticator;
    private final AuthorisationService authorisationService;
    private final ControlSessionAdapter controlSessionAdapter;
    private final ControlResponseProxy controlResponseProxy = new ControlResponseProxy();
    private final ControlSessionProxy controlSessionProxy = new ControlSessionProxy(this.controlResponseProxy);
    private final DutyCycleTracker dutyCycleTracker;
    private final Random random;
    private final ExpandableArrayBuffer tempBuffer = new ExpandableArrayBuffer(300);
    final Archive.Context ctx;
    Recorder recorder;
    Replayer replayer;

    ArchiveConductor(Archive.Context ctx) {
        super("archive-conductor", ctx.countedErrorHandler());
        this.ctx = ctx;
        this.aeron = ctx.aeron();
        this.aeronAgentInvoker = this.aeron.conductorAgentInvoker();
        this.driverAgentInvoker = ctx.mediaDriverAgentInvoker();
        this.epochClock = ctx.epochClock();
        this.nanoClock = ctx.nanoClock();
        this.archiveDir = ctx.archiveDir();
        this.connectTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.connectTimeoutNs());
        this.sessionLivenessCheckIntervalMs = TimeUnit.NANOSECONDS.toMillis(ctx.sessionLivenessCheckIntervalNs());
        this.catalog = ctx.catalog();
        this.markFile = ctx.archiveMarkFile();
        this.dutyCycleTracker = ctx.conductorDutyCycleTracker();
        this.cachedEpochClock.update(this.epochClock.time());
        this.random = ctx.secureRandom();
        this.authenticator = (Authenticator)ctx.authenticatorSupplier().get();
        if (null == this.authenticator) {
            throw new ArchiveException("authenticator cannot be null");
        }
        this.authorisationService = (AuthorisationService)ctx.authorisationServiceSupplier().get();
        if (null == this.authorisationService) {
            throw new ArchiveException("authorisation service cannot be null");
        }
        this.unavailableCounterHandlerRegistrationId = this.aeron.addUnavailableCounterHandler(this);
        this.closeHandlerRegistrationId = this.aeron.addCloseHandler(this::abort);
        RecordingEventsProxy recordingEventsProxy = this.recordingEventsProxy = ctx.recordingEventsEnabled() ? new RecordingEventsProxy(this.aeron.addExclusivePublication(ctx.recordingEventsChannel(), ctx.recordingEventsStreamId())) : null;
        if (ctx.controlChannelEnabled()) {
            ChannelUri controlChannelUri = ChannelUri.parse(ctx.controlChannel());
            controlChannelUri.put("sparse", Boolean.toString(ctx.controlTermBufferSparse()));
            this.controlSubscription = this.aeron.addSubscription(controlChannelUri.toString(), ctx.controlStreamId(), null, this);
        } else {
            this.controlSubscription = null;
        }
        this.localControlSubscription = this.aeron.addSubscription(ctx.localControlChannel(), ctx.localControlStreamId(), null, this);
        this.controlSessionAdapter = new ControlSessionAdapter(this.decoders, this.controlSubscription, this.localControlSubscription, this, this.authorisationService);
    }

    @Override
    public void onStart() {
        this.recorder = this.newRecorder();
        this.replayer = this.newReplayer();
        this.dutyCycleTracker.update(this.nanoClock.nanoTime());
    }

    @Override
    public void onUnavailableImage(Image image) {
        this.controlSessionAdapter.abortControlSessionByImage(image);
    }

    @Override
    public void onUnavailableCounter(CountersReader countersReader, long registrationId, int counterId) {
        Counter counter = this.counterByIdMap.remove(counterId);
        if (null != counter) {
            counter.close();
        }
    }

    abstract Recorder newRecorder();

    abstract Replayer newReplayer();

    @Override
    protected final void preSessionsClose() {
        this.closeSessionWorkers();
    }

    protected abstract void closeSessionWorkers();

    @Override
    protected void postSessionsClose() {
        if (this.isAbort) {
            this.ctx.abortLatch().countDown();
        } else {
            this.aeron.removeCloseHandler(this.closeHandlerRegistrationId);
            this.aeron.removeUnavailableCounterHandler(this.unavailableCounterHandlerRegistrationId);
            if (!this.ctx.ownsAeronClient()) {
                for (Subscription subscription : this.recordingSubscriptionByKeyMap.values()) {
                    subscription.close();
                }
                CloseHelper.close(this.localControlSubscription);
                CloseHelper.close(this.controlSubscription);
                CloseHelper.close(this.recordingEventsProxy);
            }
        }
        this.markFile.updateActivityTimestamp(-1L);
        this.markFile.force();
        this.ctx.close();
    }

    @Override
    protected void abort() {
        try {
            this.isAbort = true;
            if (null != this.recorder) {
                this.recorder.abort();
            }
            if (null != this.replayer) {
                this.replayer.abort();
            }
            this.ctx.errorCounter().close();
            if (!this.ctx.abortLatch().await(15000L, TimeUnit.MILLISECONDS)) {
                this.errorHandler.onError(new TimeoutException("awaiting abort latch", AeronException.Category.WARN));
            }
        }
        catch (InterruptedException ignore) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public int doWork() {
        long nowNs = this.nanoClock.nanoTime();
        int workCount = 0;
        if (this.isAbort) {
            throw new AgentTerminationException("unexpected Aeron close");
        }
        this.dutyCycleTracker.measureAndUpdate(nowNs);
        long nowMs = this.epochClock.time();
        if (this.cachedEpochClock.time() != nowMs) {
            this.cachedEpochClock.update(nowMs);
            workCount += this.invokeAeronInvoker();
            if (nowMs >= this.markFileUpdateDeadlineMs) {
                this.markFileUpdateDeadlineMs = nowMs + Archive.Configuration.MARK_FILE_UPDATE_INTERVAL_MS;
                this.markFile.updateActivityTimestamp(nowMs);
            }
        }
        workCount += this.controlSessionAdapter.poll();
        workCount += this.checkReplayTokens(nowNs);
        workCount += this.invokeDriverConductor();
        return (workCount += this.runTasks(this.taskQueue)) + super.doWork();
    }

    Archive.Context context() {
        return this.ctx;
    }

    ControlResponseProxy controlResponseProxy() {
        return this.controlResponseProxy;
    }

    final int invokeAeronInvoker() {
        int workCount = 0;
        if (null != this.aeronAgentInvoker) {
            workCount += this.aeronAgentInvoker.invoke();
            if (this.isAbort || this.aeronAgentInvoker.isClosed()) {
                this.isAbort = true;
                throw new AgentTerminationException("unexpected Aeron close");
            }
        }
        return workCount;
    }

    final int invokeDriverConductor() {
        int workCount = 0;
        if (null != this.driverAgentInvoker) {
            workCount += this.driverAgentInvoker.invoke();
            if (this.driverAgentInvoker.isClosed()) {
                throw new AgentTerminationException("unexpected driver close");
            }
        }
        return workCount;
    }

    void logWarning(String message) {
        this.errorHandler.onError(new ArchiveEvent(message));
    }

    ControlSession newControlSession(Image image, long correlationId, int streamId, int version, String channel, byte[] encodedCredentials, String clientInfo, ControlSessionAdapter controlSessionAdapter) {
        ChannelUri channelUri = ChannelUri.parse(channel);
        String mtuStr = channelUri.get("mtu");
        int mtuLength = null == mtuStr ? this.ctx.controlMtuLength() : (int)SystemUtil.parseSize("mtu", mtuStr);
        String termLengthStr = channelUri.get("term-length");
        int termLength = null == termLengthStr ? this.ctx.controlTermBufferLength() : (int)SystemUtil.parseSize("term-length", termLengthStr);
        String isSparseStr = channelUri.get("sparse");
        boolean isSparse = null == isSparseStr ? this.ctx.controlTermBufferSparse() : Boolean.parseBoolean(isSparseStr);
        boolean usingResponseChannel = "response".equals(channelUri.get("control-mode"));
        ChannelUriStringBuilder urlBuilder = ArchiveConductor.strippedChannelBuilder(channelUri).termLength(termLength).sparse(isSparse).mtu(mtuLength);
        if (usingResponseChannel) {
            urlBuilder.responseCorrelationId(image.correlationId());
        }
        String responseChannel = urlBuilder.build();
        String invalidVersionMessage = null;
        if (SemanticVersion.major(version) != 1) {
            invalidVersionMessage = "invalid client version " + SemanticVersion.toString(version) + ", archive is " + SemanticVersion.toString(AeronArchive.Configuration.PROTOCOL_SEMANTIC_VERSION);
        }
        long controlSessionId = this.nextSessionId++;
        long controlPublicationRegistrationId = this.aeron.asyncAddExclusivePublication(responseChannel, streamId);
        String imageInfo = "sourceIdentity=" + image.sourceIdentity() + " sessionId=" + image.sessionId();
        long sessionCounterRegistrationId = ControlSessionCounter.allocate(this.aeron, this.tempBuffer, this.ctx.archiveId(), controlSessionId, Strings.isEmpty(clientInfo) ? imageInfo : clientInfo + " " + imageInfo);
        ControlSession controlSession = new ControlSession(controlSessionId, correlationId, this.connectTimeoutMs, this.sessionLivenessCheckIntervalMs, controlPublicationRegistrationId, sessionCounterRegistrationId, responseChannel, streamId, invalidVersionMessage, controlSessionAdapter, this.aeron, this, this.cachedEpochClock, this.controlResponseProxy, this.authenticator, this.controlSessionProxy);
        this.authenticator.onConnectRequest(controlSession.sessionId(), encodedCredentials, this.cachedEpochClock.time());
        this.addSession(controlSession);
        this.ctx.controlSessionsCounter().incrementRelease();
        return controlSession;
    }

    void archiveId(long correlationId, ControlSession controlSession) {
        controlSession.sendOkResponse(correlationId, this.ctx.archiveId());
    }

    void startRecording(long correlationId, int streamId, SourceLocation sourceLocation, boolean autoStop, String originalChannel, ControlSession controlSession) {
        if (this.recordingSessionByIdMap.size() >= this.ctx.maxConcurrentRecordings()) {
            String msg = "max concurrent recordings reached " + this.ctx.maxConcurrentRecordings();
            controlSession.sendErrorResponse(correlationId, 8L, msg);
            return;
        }
        if (null != this.isLowStorageSpace(correlationId, controlSession)) {
            return;
        }
        try {
            ChannelUri channelUri = ChannelUri.parse(originalChannel);
            String key = ArchiveConductor.makeKey(streamId, channelUri);
            Subscription oldSubscription = this.recordingSubscriptionByKeyMap.get(key);
            if (null == oldSubscription) {
                String strippedChannel = ArchiveConductor.strippedChannelBuilder(channelUri).build();
                String channel = sourceLocation == SourceLocation.LOCAL && channelUri.isUdp() ? "aeron-spy:" + strippedChannel : strippedChannel;
                AvailableImageHandler handler = image -> this.taskQueue.addLast(() -> this.startRecordingSession(controlSession, correlationId, strippedChannel, originalChannel, image, autoStop));
                Subscription subscription = this.aeron.addSubscription(channel, streamId, handler, null);
                this.recordingSubscriptionByKeyMap.put(key, subscription);
                this.subscriptionRefCountMap.incrementAndGet(subscription.registrationId());
                controlSession.sendOkResponse(correlationId, subscription.registrationId());
            } else {
                String msg = "recording exists for streamId=" + streamId + " channel=" + originalChannel;
                controlSession.sendErrorResponse(correlationId, 3L, msg);
            }
        }
        catch (Exception ex) {
            this.errorHandler.onError(ex);
            controlSession.sendErrorResponse(correlationId, ex.getMessage());
        }
    }

    void stopRecording(long correlationId, int streamId, String channel, ControlSession controlSession) {
        try {
            String key = ArchiveConductor.makeKey(streamId, ChannelUri.parse(channel));
            Subscription subscription = this.recordingSubscriptionByKeyMap.remove(key);
            if (null != subscription) {
                this.abortRecordingSessionAndCloseSubscription(subscription);
                controlSession.sendOkResponse(correlationId);
            } else {
                String msg = "no recording found for streamId=" + streamId + " channel=" + channel;
                controlSession.sendErrorResponse(correlationId, 4L, msg);
            }
        }
        catch (Exception ex) {
            this.errorHandler.onError(ex);
            controlSession.sendErrorResponse(correlationId, ex.getMessage());
        }
    }

    void stopRecordingSubscription(long correlationId, long subscriptionId, ControlSession controlSession) {
        if (this.stopRecordingSubscription(subscriptionId)) {
            controlSession.sendOkResponse(correlationId);
        } else {
            String msg = "no recording subscription found for subscriptionId=" + subscriptionId;
            controlSession.sendErrorResponse(correlationId, 4L, msg);
        }
    }

    boolean stopRecordingSubscription(long subscriptionId) {
        Subscription subscription = this.removeRecordingSubscription(subscriptionId);
        if (null != subscription) {
            this.abortRecordingSessionAndCloseSubscription(subscription);
            return true;
        }
        return false;
    }

    void newListRecordingsSession(long correlationId, long fromId, int count, ControlSession controlSession) {
        if (controlSession.hasActiveListing()) {
            String msg = "active listing already in progress";
            controlSession.sendErrorResponse(correlationId, 1L, "active listing already in progress");
            return;
        }
        ListRecordingsSession session = new ListRecordingsSession(correlationId, fromId, count, this.catalog, controlSession, this.descriptorBuffer);
        this.addSession(session);
        controlSession.activeListing(session);
    }

    void newListRecordingsForUriSession(long correlationId, long fromRecordingId, int count, int streamId, byte[] channelFragment, ControlSession controlSession) {
        if (controlSession.hasActiveListing()) {
            String msg = "active listing already in progress";
            controlSession.sendErrorResponse(correlationId, 1L, "active listing already in progress");
            return;
        }
        ListRecordingsForUriSession session = new ListRecordingsForUriSession(correlationId, fromRecordingId, count, channelFragment, streamId, this.catalog, controlSession, this.descriptorBuffer, this.recordingDescriptorDecoder);
        this.addSession(session);
        controlSession.activeListing(session);
    }

    void listRecording(long correlationId, long recordingId, ControlSession controlSession) {
        if (controlSession.hasActiveListing()) {
            String msg = "active listing already in progress";
            controlSession.sendErrorResponse(correlationId, 1L, "active listing already in progress");
        } else if (!this.catalog.hasRecording(recordingId)) {
            controlSession.sendRecordingUnknown(correlationId, recordingId);
        } else {
            ListRecordingByIdSession session = new ListRecordingByIdSession(correlationId, recordingId, this.catalog, controlSession, this.descriptorBuffer);
            this.addSession(session);
            controlSession.activeListing(session);
        }
    }

    void updateChannel(long correlationId, long recordingId, String channel, ControlSession controlSession) {
        if (controlSession.hasActiveListing()) {
            String msg = "active listing already in progress";
            controlSession.sendErrorResponse(correlationId, 1L, "active listing already in progress");
        } else if (!this.catalog.hasRecording(recordingId)) {
            controlSession.sendRecordingUnknown(correlationId, recordingId);
        } else {
            ChannelUri channelUri = ChannelUri.parse(channel);
            String strippedChannel = ArchiveConductor.strippedChannelBuilder(channelUri).build();
            UpdateChannelSession updateChannelSession = new UpdateChannelSession(correlationId, recordingId, channel, strippedChannel, this.catalog, controlSession, this.descriptorBuffer);
            this.addSession(updateChannelSession);
            controlSession.activeListing(updateChannelSession);
        }
    }

    void findLastMatchingRecording(long correlationId, long minRecordingId, int sessionId, int streamId, byte[] channelFragment, ControlSession controlSession) {
        if (minRecordingId < 0L) {
            String msg = "minRecordingId=" + minRecordingId + " < 0";
            controlSession.sendErrorResponse(correlationId, 5L, msg);
        } else {
            long recordingId = this.catalog.findLast(minRecordingId, sessionId, streamId, channelFragment);
            controlSession.sendOkResponse(correlationId, recordingId);
        }
    }

    void startReplay(long correlationId, long recordingId, long position, long length, int fileIoMaxLength, int replayStreamId, String replayChannel, Counter limitPositionCounter, ControlSession controlSession) {
        String msg;
        long replayLength;
        long maxLength;
        long stopPosition;
        long startPosition;
        RecordingSession recordingSession;
        if (this.replaySessionByIdMap.size() >= this.ctx.maxConcurrentReplays()) {
            String msg2 = "max concurrent replays reached " + this.ctx.maxConcurrentReplays();
            controlSession.sendErrorResponse(correlationId, 7L, msg2);
            return;
        }
        if (!this.catalog.hasRecording(recordingId)) {
            String msg3 = "unknown recording id: " + recordingId;
            controlSession.sendErrorResponse(correlationId, 5L, msg3);
            return;
        }
        Counter replayLimitPositionCounter = limitPositionCounter;
        if (null == replayLimitPositionCounter && null != (recordingSession = this.recordingSessionByIdMap.get(recordingId))) {
            replayLimitPositionCounter = recordingSession.recordingPosition();
        }
        long limitPosition = -1L;
        if (null != replayLimitPositionCounter) {
            limitPosition = replayLimitPositionCounter.get();
        }
        this.catalog.recordingSummary(recordingId, this.recordingSummary);
        long replayPosition = startPosition = this.recordingSummary.startPosition;
        if (-1L != position) {
            if (this.isInvalidReplayPosition(correlationId, controlSession, recordingId, position, this.recordingSummary)) {
                return;
            }
            replayPosition = position;
        }
        if (fileIoMaxLength > 0 && fileIoMaxLength < this.recordingSummary.mtuLength) {
            String msg4 = "fileIoMaxLength=" + fileIoMaxLength + " < mtuLength=" + this.recordingSummary.mtuLength;
            controlSession.sendErrorResponse(correlationId, msg4);
            return;
        }
        if (this.ctx.replayBuffer().capacity() < this.recordingSummary.mtuLength) {
            int replayBufferCapacity = this.ctx.replayBuffer().capacity();
            String msg5 = "replayBufferCapacity=" + replayBufferCapacity + " < mtuLength=" + this.recordingSummary.mtuLength;
            controlSession.sendErrorResponse(correlationId, msg5);
            return;
        }
        if (-1L != limitPosition) {
            if (replayPosition > limitPosition) {
                String msg6 = "requested replay start position=" + replayPosition + " must be less than the limit position=" + limitPosition + " for recording " + recordingId;
                controlSession.sendErrorResponse(correlationId, msg6);
                return;
            }
            stopPosition = limitPosition;
            maxLength = Long.MAX_VALUE - replayPosition;
        } else {
            stopPosition = this.recordingSummary.stopPosition;
            maxLength = stopPosition - replayPosition;
        }
        if (-1L == length) {
            replayLength = maxLength;
        } else if (-2L == length) {
            replayLength = stopPosition - replayPosition;
            if (0L == replayLength) {
                msg = "When replaying and stopping the replay length must be non-zero, recordingId=" + recordingId;
                controlSession.sendErrorResponse(correlationId, 15L, msg);
            }
        } else {
            replayLength = Math.min(length, maxLength);
        }
        if (replayLength < 0L) {
            msg = "replay length must be positive: replayLength=" + replayLength + ", length=" + length + ", stopPosition=" + stopPosition + ", replayPosition=" + replayPosition + " for recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, msg);
            return;
        }
        try {
            ChannelUri channelUri = ChannelUri.parse(replayChannel);
            ChannelUriStringBuilder channelBuilder = ArchiveConductor.strippedChannelBuilder(channelUri).initialPosition(replayPosition, this.recordingSummary.initialTermId, this.recordingSummary.termBufferLength).eos(channelUri).sparse(channelUri).mtu(this.recordingSummary.mtuLength);
            String lingerValue = channelUri.get("linger");
            channelBuilder.linger(null != lingerValue ? Long.parseLong(lingerValue) : this.ctx.replayLingerTimeoutNs());
            this.addSession(new CreateReplayPublicationSession(correlationId, recordingId, replayPosition, replayLength, startPosition, stopPosition, this.recordingSummary.segmentFileLength, this.recordingSummary.termBufferLength, this.recordingSummary.streamId, this.aeron.asyncAddExclusivePublication(channelBuilder.build(), replayStreamId), fileIoMaxLength, replayLimitPositionCounter, this.aeron, controlSession, this));
        }
        catch (Exception ex) {
            String msg7 = "failed to process replayChannel - " + ex.getMessage();
            controlSession.sendErrorResponse(correlationId, msg7);
            throw ex;
        }
    }

    void newReplaySession(long correlationId, long recordingId, long replayPosition, long replayLength, long startPosition, long stopPosition, int segmentFileLength, int termBufferLength, int streamId, int fileIoMaxLength, ControlSession controlSession, Counter replayLimitPosition, ExclusivePublication replayPublication) {
        long replaySessionId = (long)this.replayId++ << 32 | (long)replayPublication.sessionId() & 0xFFFFFFFFL;
        UnsafeBuffer replayBuffer = 0 < fileIoMaxLength && fileIoMaxLength < this.ctx.replayBuffer().capacity() ? new UnsafeBuffer(this.ctx.replayBuffer(), 0, fileIoMaxLength) : this.ctx.replayBuffer();
        ReplaySession replaySession = new ReplaySession(correlationId, recordingId, replayPosition, replayLength, startPosition, stopPosition, segmentFileLength, termBufferLength, streamId, replaySessionId, this.connectTimeoutMs, controlSession, replayBuffer, this.archiveDir, this.cachedEpochClock, this.nanoClock, replayPublication, this.aeron.countersReader(), replayLimitPosition, this.ctx.replayChecksum(), this.replayer);
        this.replaySessionByIdMap.put(replaySessionId, replaySession);
        this.replayer.addSession(replaySession);
        this.ctx.replaySessionCounter().incrementRelease();
    }

    void startBoundedReplay(long correlationId, long recordingId, long position, long length, int limitCounterId, int fileIoMaxLength, int replayStreamId, String replayChannel, ControlSession controlSession) {
        Counter replayLimitCounter = this.counterByIdMap.get(limitCounterId);
        if (null == replayLimitCounter) {
            try {
                replayLimitCounter = new Counter(this.aeron.countersReader(), -1L, limitCounterId);
            }
            catch (Exception ex) {
                String msg = "unable to create replay limit counter id= " + limitCounterId + " because of: " + ex.getMessage();
                controlSession.sendErrorResponse(correlationId, 0L, msg);
                return;
            }
            this.counterByIdMap.put(limitCounterId, replayLimitCounter);
        }
        this.startReplay(correlationId, recordingId, position, length, fileIoMaxLength, replayStreamId, replayChannel, replayLimitCounter, controlSession);
    }

    void stopReplay(long correlationId, long replaySessionId, ControlSession controlSession) {
        ReplaySession replaySession = this.replaySessionByIdMap.get(replaySessionId);
        if (null != replaySession) {
            replaySession.abort("stop replay");
        }
        controlSession.sendOkResponse(correlationId);
    }

    void stopAllReplays(long correlationId, long recordingId, ControlSession controlSession) {
        for (ReplaySession replaySession : this.replaySessionByIdMap.values()) {
            if (-1L != recordingId && replaySession.recordingId() != recordingId) continue;
            replaySession.abort("stop all replays");
        }
        controlSession.sendOkResponse(correlationId);
    }

    Object extendRecording(long correlationId, long recordingId, int streamId, SourceLocation sourceLocation, boolean autoStop, String originalChannel, ControlSession controlSession) {
        if (this.recordingSessionByIdMap.size() >= this.ctx.maxConcurrentRecordings()) {
            String msg = "max concurrent recordings reached at " + this.ctx.maxConcurrentRecordings();
            controlSession.sendErrorResponse(correlationId, 8L, msg);
            return msg;
        }
        if (!this.catalog.hasRecording(recordingId)) {
            String msg = "unknown recording id: " + recordingId;
            controlSession.sendErrorResponse(correlationId, 5L, msg);
            return msg;
        }
        this.catalog.recordingSummary(recordingId, this.recordingSummary);
        if (streamId != this.recordingSummary.streamId) {
            String msg = "cannot extend recording " + this.recordingSummary.recordingId + " with streamId=" + streamId + " for existing streamId=" + this.recordingSummary.streamId;
            controlSession.sendErrorResponse(correlationId, 5L, msg);
            return msg;
        }
        if (this.recordingSessionByIdMap.containsKey(recordingId)) {
            String msg = "cannot extend active recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, 2L, msg);
            return msg;
        }
        DeleteSegmentsSession deleteSegmentsSession = this.deleteSegmentsSessionByIdMap.get(recordingId);
        if (null != deleteSegmentsSession && deleteSegmentsSession.maxDeletePosition() >= this.recordingSummary.stopPosition) {
            String msg = "cannot extend recording " + recordingId + " due to an outstanding delete operation";
            controlSession.sendErrorResponse(correlationId, msg);
            return msg;
        }
        String lowStorageSpaceMsg = this.isLowStorageSpace(correlationId, controlSession);
        if (null != lowStorageSpaceMsg) {
            return lowStorageSpaceMsg;
        }
        try {
            ChannelUri channelUri = ChannelUri.parse(originalChannel);
            String key = ArchiveConductor.makeKey(streamId, channelUri);
            Subscription oldSubscription = this.recordingSubscriptionByKeyMap.get(key);
            if (null == oldSubscription) {
                String strippedChannel = ArchiveConductor.strippedChannelBuilder(channelUri).build();
                String channel = originalChannel.contains("udp") && sourceLocation == SourceLocation.LOCAL ? "aeron-spy:" + strippedChannel : strippedChannel;
                AvailableImageHandler handler = image -> this.taskQueue.addLast(() -> this.extendRecordingSession(controlSession, correlationId, recordingId, strippedChannel, originalChannel, image, autoStop));
                Subscription subscription = this.aeron.addSubscription(channel, streamId, handler, null);
                this.recordingSubscriptionByKeyMap.put(key, subscription);
                this.subscriptionRefCountMap.incrementAndGet(subscription.registrationId());
                controlSession.sendOkResponse(correlationId, subscription.registrationId());
                return subscription;
            }
            String msg = "recording exists for streamId=" + streamId + " channel=" + originalChannel;
            controlSession.sendErrorResponse(correlationId, 3L, msg);
            return msg;
        }
        catch (Exception ex) {
            this.errorHandler.onError(ex);
            controlSession.sendErrorResponse(correlationId, ex.getMessage());
            return ex.getMessage();
        }
    }

    void getStartPosition(long correlationId, long recordingId, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession)) {
            controlSession.sendOkResponse(correlationId, this.catalog.startPosition(recordingId));
        }
    }

    void getRecordingPosition(long correlationId, long recordingId, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession)) {
            RecordingSession recordingSession = this.recordingSessionByIdMap.get(recordingId);
            long position = null == recordingSession ? -1L : recordingSession.recordingPosition().get();
            controlSession.sendOkResponse(correlationId, position);
        }
    }

    void getStopPosition(long correlationId, long recordingId, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession)) {
            controlSession.sendOkResponse(correlationId, this.catalog.stopPosition(recordingId));
        }
    }

    void getMaxRecordedPosition(long correlationId, long recordingId, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession)) {
            RecordingSession recordingSession = this.recordingSessionByIdMap.get(recordingId);
            long maxRecordedPosition = null != recordingSession ? recordingSession.recordingPosition().get() : this.catalog.stopPosition(recordingId);
            controlSession.sendOkResponse(correlationId, maxRecordedPosition);
        }
    }

    void truncateRecording(long correlationId, long recordingId, long position, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession) && this.isValidTruncate(correlationId, controlSession, recordingId, position) && this.isDeleteAllowed(recordingId, correlationId, controlSession)) {
            long stopPosition = this.recordingSummary.stopPosition;
            int segmentLength = this.recordingSummary.segmentFileLength;
            int termLength = this.recordingSummary.termBufferLength;
            long startPosition = this.recordingSummary.startPosition;
            long segmentBasePosition = AeronArchive.segmentFileBasePosition(startPosition, position, termLength, segmentLength);
            int segmentOffset = (int)(position - segmentBasePosition);
            this.catalog.stopPosition(recordingId, position);
            ArrayDeque<String> files = new ArrayDeque<String>();
            if (startPosition == position) {
                this.listSegmentFiles(recordingId, files::addLast);
            } else {
                if (segmentOffset > 0) {
                    File file;
                    if (stopPosition != position && !this.eraseRemainingSegment(correlationId, controlSession, position, segmentLength, segmentOffset, termLength, file = new File(this.archiveDir, Archive.segmentFileName(recordingId, segmentBasePosition)))) {
                        return;
                    }
                } else {
                    files.addLast(Archive.segmentFileName(recordingId, segmentBasePosition));
                }
                for (long p = segmentBasePosition + (long)segmentLength; p <= stopPosition; p += (long)segmentLength) {
                    files.addLast(Archive.segmentFileName(recordingId, p));
                }
            }
            this.deleteSegments(correlationId, recordingId, controlSession, files);
        }
    }

    void purgeRecording(long correlationId, long recordingId, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession) && this.isValidPurge(correlationId, controlSession, recordingId) && this.isDeleteAllowed(recordingId, correlationId, controlSession)) {
            this.catalog.changeState(recordingId, RecordingState.DELETED);
            ArrayDeque<String> files = new ArrayDeque<String>();
            this.listSegmentFiles(recordingId, files::addLast);
            this.deleteSegments(correlationId, recordingId, controlSession, files);
        }
    }

    void listRecordingSubscriptions(long correlationId, int pseudoIndex, int subscriptionCount, boolean applyStreamId, int streamId, String channelFragment, ControlSession controlSession) {
        if (controlSession.hasActiveListing()) {
            String msg = "active listing already in progress";
            controlSession.sendErrorResponse(correlationId, 1L, "active listing already in progress");
        } else if (pseudoIndex < 0 || pseudoIndex >= this.recordingSubscriptionByKeyMap.size() || subscriptionCount <= 0) {
            controlSession.sendSubscriptionUnknown(correlationId);
        } else {
            ListRecordingSubscriptionsSession session = new ListRecordingSubscriptionsSession(this.recordingSubscriptionByKeyMap, pseudoIndex, subscriptionCount, streamId, applyStreamId, channelFragment, correlationId, controlSession);
            this.addSession(session);
            controlSession.activeListing(session);
        }
    }

    void stopRecordingByIdentity(long correlationId, long recordingId, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession)) {
            int found = 0;
            RecordingSession recordingSession = this.recordingSessionByIdMap.get(recordingId);
            if (null != recordingSession) {
                recordingSession.abort("stop recording by identity");
                long subscriptionId = recordingSession.subscription().registrationId();
                Subscription subscription = this.removeRecordingSubscription(subscriptionId);
                if (null != subscription) {
                    found = 1;
                    if (0L == this.subscriptionRefCountMap.decrementAndGet(subscriptionId)) {
                        subscription.close();
                    }
                }
            }
            controlSession.sendOkResponse(correlationId, found);
        }
    }

    void closeRecordingSession(RecordingSession session) {
        if (this.isAbort) {
            session.abortClose();
        } else {
            Subscription subscription = session.subscription();
            long position = session.recordedPosition();
            long recordingId = session.sessionId();
            long subscriptionId = subscription.registrationId();
            try {
                this.catalog.recordingStopped(recordingId, position, this.epochClock.time());
                this.recordingSessionByIdMap.remove(recordingId);
                session.sendPendingError();
                session.controlSession().sendSignal(session.correlationId(), recordingId, subscriptionId, position, RecordingSignal.STOP);
            }
            catch (Exception ex) {
                this.errorHandler.onError(ex);
            }
            if (this.subscriptionRefCountMap.decrementAndGet(subscriptionId) <= 0L || session.isAutoStop()) {
                this.closeAndRemoveRecordingSubscription(subscription, "close recording session");
            }
            this.closeSession(session);
            this.ctx.recordingSessionCounter().decrementRelease();
        }
    }

    void closeReplaySession(ReplaySession session) {
        if (!this.isAbort) {
            try {
                session.sendPendingError();
            }
            catch (Exception ex) {
                this.errorHandler.onError(ex);
            }
        }
        this.replaySessionByIdMap.remove(session.sessionId());
        this.closeSession(session);
        this.ctx.replaySessionCounter().decrementRelease();
    }

    void replicate(long correlationId, long srcRecordingId, long dstRecordingId, long stopPosition, long channelTagId, long subscriptionTagId, int srcControlStreamId, String srcControlChannel, String liveDestination, String replicationChannel, int fileIoMaxLength, int replicationSessionId, byte[] encodedCredentials, String srcResponseChannel, ControlSession controlSession) {
        String replicationChannel0 = Strings.isEmpty(replicationChannel) ? this.ctx.replicationChannel() : replicationChannel;
        ChannelUri replicationChannelUri = ChannelUri.parse(replicationChannel0);
        if (replicationChannelUri.hasControlModeResponse() && !Strings.isEmpty(liveDestination)) {
            String msg = "response channels can't be used with live destinations";
            controlSession.sendErrorResponse(correlationId, 0L, "response channels can't be used with live destinations");
            return;
        }
        if (replicationChannelUri.hasControlModeResponse() && (-1L != channelTagId || -1L != subscriptionTagId)) {
            String msg = "response channels can't be used with tagged replication";
            controlSession.sendErrorResponse(correlationId, 0L, "response channels can't be used with tagged replication");
            return;
        }
        boolean hasRecording = this.catalog.hasRecording(dstRecordingId);
        if (-1L != dstRecordingId && !hasRecording) {
            String msg = "unknown destination recording id " + dstRecordingId;
            controlSession.sendErrorResponse(correlationId, 5L, msg);
            return;
        }
        if (hasRecording) {
            this.catalog.recordingSummary(dstRecordingId, this.recordingSummary);
            if (-1L == this.recordingSummary.stopPosition || this.recordingSessionByIdMap.containsKey(dstRecordingId)) {
                String msg = "cannot replicate to active recording " + dstRecordingId;
                controlSession.sendErrorResponse(correlationId, 2L, msg);
                return;
            }
        }
        AeronArchive.Context remoteArchiveContext = this.ctx.archiveClientContext().clone().controlRequestChannel(srcControlChannel).controlRequestStreamId(srcControlStreamId);
        if (null != encodedCredentials && 0 < encodedCredentials.length) {
            remoteArchiveContext.credentialsSupplier(new ReplicationCredentialsSupplier(encodedCredentials));
        }
        if (!Strings.isEmpty(srcResponseChannel)) {
            remoteArchiveContext.controlResponseChannel(srcResponseChannel);
        }
        long replicationId = this.nextSessionId++;
        remoteArchiveContext.clientName(remoteArchiveContext.clientName() + " replicationSessionId=" + replicationId);
        ReplicationSession replicationSession = new ReplicationSession(srcRecordingId, dstRecordingId, channelTagId, subscriptionTagId, replicationId, stopPosition, liveDestination, replicationChannel0, fileIoMaxLength, replicationSessionId, hasRecording ? this.recordingSummary : null, remoteArchiveContext, this.cachedEpochClock, this.catalog, controlSession);
        this.replicationSessionByIdMap.put(replicationId, replicationSession);
        this.addSession(replicationSession);
        controlSession.sendOkResponse(correlationId, replicationId);
    }

    void stopReplication(long correlationId, long replicationId, ControlSession controlSession) {
        ReplicationSession session = this.replicationSessionByIdMap.remove(replicationId);
        if (null == session) {
            String msg = "unknown replication id " + replicationId;
            controlSession.sendErrorResponse(correlationId, 12L, msg);
        } else {
            session.abort("stop replication");
            controlSession.sendOkResponse(correlationId);
        }
    }

    void detachSegments(long correlationId, long recordingId, long newStartPosition, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession) && this.isValidDetach(correlationId, controlSession, recordingId, newStartPosition)) {
            this.catalog.startPosition(recordingId, newStartPosition);
            controlSession.sendOkResponse(correlationId);
        }
    }

    void deleteDetachedSegments(long correlationId, long recordingId, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession) && this.isDeleteAllowed(recordingId, correlationId, controlSession)) {
            MutableLong minPosition = new MutableLong(Long.MAX_VALUE);
            int prefixLength = AsciiEncoding.digitCount(recordingId) + 1;
            this.listSegmentFiles(recordingId, segmentFile -> {
                int dotIndex = segmentFile.indexOf(46);
                long filePosition = AsciiEncoding.parseLongAscii(segmentFile, prefixLength, dotIndex - prefixLength);
                minPosition.set(Math.min(minPosition.get(), filePosition));
            });
            ArrayDeque<String> files = new ArrayDeque<String>();
            if (Long.MAX_VALUE != minPosition.get()) {
                this.findDetachedSegments(recordingId, files, minPosition.get());
            }
            this.deleteSegments(correlationId, recordingId, controlSession, files);
        }
    }

    void purgeSegments(long correlationId, long recordingId, long newStartPosition, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession) && this.isValidDetach(correlationId, controlSession, recordingId, newStartPosition) && this.isDeleteAllowed(recordingId, correlationId, controlSession)) {
            this.catalog.recordingSummary(recordingId, this.recordingSummary);
            long oldStartPosition = this.recordingSummary.startPosition;
            this.catalog.startPosition(recordingId, newStartPosition);
            ArrayDeque<String> files = new ArrayDeque<String>();
            this.findDetachedSegments(recordingId, files, oldStartPosition);
            this.deleteSegments(correlationId, recordingId, controlSession, files);
        }
    }

    void attachSegments(long correlationId, long recordingId, ControlSession controlSession) {
        if (this.hasRecording(recordingId, correlationId, controlSession)) {
            File file;
            this.catalog.recordingSummary(recordingId, this.recordingSummary);
            int segmentLength = this.recordingSummary.segmentFileLength;
            int termLength = this.recordingSummary.termBufferLength;
            int bitsToShift = LogBufferDescriptor.positionBitsToShift(termLength);
            int streamId = this.recordingSummary.streamId;
            long position = this.recordingSummary.startPosition - (long)segmentLength;
            long count = 0L;
            while (position >= 0L && (file = new File(this.archiveDir, Archive.segmentFileName(recordingId, position))).exists()) {
                long fileLength = file.length();
                if (fileLength != (long)segmentLength) {
                    String msg = "fileLength=" + fileLength + " not equal to segmentLength=" + segmentLength;
                    controlSession.sendErrorResponse(correlationId, msg);
                    return;
                }
                try {
                    FileChannel fileChannel = FileChannel.open(file.toPath(), FILE_OPTIONS, new FileAttribute[0]);
                    try {
                        int termCount = (int)(position >> bitsToShift);
                        int termId = this.recordingSummary.initialTermId + termCount;
                        int termOffset = this.findTermOffsetForStart(correlationId, controlSession, file, fileChannel, streamId, termId, termLength);
                        if (termOffset < 0) {
                            return;
                        }
                        if (0 == termOffset) {
                            this.catalog.startPosition(recordingId, position);
                            ++count;
                            position -= (long)segmentLength;
                            continue;
                        }
                        this.catalog.startPosition(recordingId, position + (long)termOffset);
                        ++count;
                        break;
                    }
                    finally {
                        if (fileChannel == null) continue;
                        fileChannel.close();
                    }
                }
                catch (IOException ex) {
                    controlSession.sendErrorResponse(correlationId, ex.getMessage());
                    LangUtil.rethrowUnchecked(ex);
                }
            }
            controlSession.sendOkResponse(correlationId, count);
        }
    }

    void migrateSegments(long correlationId, long srcRecordingId, long dstRecordingId, ControlSession controlSession) {
        if (this.hasRecording(srcRecordingId, correlationId, controlSession) && this.hasRecording(dstRecordingId, correlationId, controlSession)) {
            int toBeDeletedSegmentCount;
            long joinPosition;
            RecordingSummary srcSummary = this.catalog.recordingSummary(srcRecordingId, this.recordingSummary);
            RecordingSummary dstSummary = this.catalog.recordingSummary(dstRecordingId, new RecordingSummary());
            if (this.isActiveRecording(controlSession, correlationId, srcSummary) || !this.hasMatchingStreamParameters(controlSession, correlationId, srcSummary, dstSummary)) {
                return;
            }
            if (srcSummary.stopPosition == dstSummary.startPosition) {
                joinPosition = srcSummary.stopPosition;
            } else if (srcSummary.startPosition == dstSummary.stopPosition) {
                joinPosition = srcSummary.startPosition;
            } else {
                String msg = "invalid migrate: src and dst are not contiguous srcStartPosition=" + srcSummary.startPosition + " srcStopPosition=" + srcSummary.stopPosition + " dstStartPosition=" + dstSummary.startPosition + " dstStopPosition=" + dstSummary.stopPosition;
                controlSession.sendErrorResponse(correlationId, msg);
                return;
            }
            if (this.isJoinPositionSegmentUnaligned(controlSession, correlationId, "src", srcSummary, joinPosition) || this.isJoinPositionSegmentUnaligned(controlSession, correlationId, "dst", dstSummary, joinPosition)) {
                return;
            }
            ArrayDeque<String> emptyFollowingSrcSegment = new ArrayDeque<String>();
            long movedSegmentCount = this.moveAllSegments(controlSession, correlationId, srcRecordingId, dstRecordingId, srcSummary, emptyFollowingSrcSegment);
            if (movedSegmentCount >= 0L && (toBeDeletedSegmentCount = this.addDeleteSegmentsSession(correlationId, srcRecordingId, controlSession, emptyFollowingSrcSegment)) >= 0) {
                boolean hasSegmentsToDelete;
                if (srcSummary.stopPosition == dstSummary.startPosition) {
                    this.catalog.startPosition(dstRecordingId, srcSummary.startPosition);
                } else {
                    this.catalog.stopPosition(dstRecordingId, srcSummary.stopPosition);
                }
                this.catalog.stopPosition(srcRecordingId, srcSummary.startPosition);
                controlSession.sendOkResponse(correlationId, movedSegmentCount);
                boolean bl = hasSegmentsToDelete = toBeDeletedSegmentCount > 0;
                if (movedSegmentCount > 0L && !hasSegmentsToDelete) {
                    controlSession.sendSignal(correlationId, srcRecordingId, -1L, -1L, RecordingSignal.DELETE);
                }
            }
        }
    }

    void removeReplicationSession(ReplicationSession replicationSession) {
        this.replicationSessionByIdMap.remove(replicationSession.sessionId());
    }

    void removeDeleteSegmentsSession(DeleteSegmentsSession deleteSegmentsSession) {
        this.deleteSegmentsSessionByIdMap.remove(deleteSegmentsSession.sessionId());
    }

    private void findDetachedSegments(long recordingId, ArrayDeque<String> files, long prevStartPosition) {
        this.catalog.recordingSummary(recordingId, this.recordingSummary);
        int segmentFileLength = this.recordingSummary.segmentFileLength;
        long prevSegmentFilePosition = AeronArchive.segmentFileBasePosition(prevStartPosition, prevStartPosition, this.recordingSummary.termBufferLength, segmentFileLength);
        long startSegmentFilePosition = AeronArchive.segmentFileBasePosition(this.recordingSummary.startPosition, this.recordingSummary.startPosition, this.recordingSummary.termBufferLength, segmentFileLength);
        for (long filenamePosition = startSegmentFilePosition - (long)segmentFileLength; filenamePosition >= prevSegmentFilePosition; filenamePosition -= (long)segmentFileLength) {
            String segmentFileName = Archive.segmentFileName(recordingId, filenamePosition);
            files.addFirst(segmentFileName);
        }
    }

    private int addDeleteSegmentsSession(long correlationId, long recordingId, ControlSession controlSession, ArrayDeque<String> files) {
        if (files.isEmpty()) {
            return 0;
        }
        ArrayDeque<File> deleteList = new ArrayDeque<File>(files.size());
        for (String name : files) {
            deleteList.add(new File(this.archiveDir, name));
        }
        this.addSession(new DeleteSegmentsSession(recordingId, correlationId, deleteList, controlSession, this.errorHandler));
        return files.size();
    }

    private void abortRecordingSessionAndCloseSubscription(Subscription subscription) {
        for (RecordingSession session : this.recordingSessionByIdMap.values()) {
            if (subscription != session.subscription()) continue;
            session.abort("stop recording");
        }
        if (0L == this.subscriptionRefCountMap.decrementAndGet(subscription.registrationId())) {
            subscription.close();
        }
    }

    private int findTermOffsetForStart(long correlationId, ControlSession controlSession, File file, FileChannel fileChannel, int streamId, int termId, int termLength) throws IOException {
        int termOffset = 0;
        UnsafeBuffer buffer = this.ctx.dataBuffer();
        ByteBuffer byteBuffer = buffer.byteBuffer();
        byteBuffer.clear().limit(32);
        if (32 != fileChannel.read(byteBuffer, 0L)) {
            String msg = "failed to read segment file";
            controlSession.sendErrorResponse(correlationId, "failed to read segment file");
            return termOffset;
        }
        int fragmentLength = DataHeaderFlyweight.fragmentLength(buffer, termOffset);
        if (fragmentLength <= 0) {
            int offset;
            boolean found = false;
            block0: do {
                byteBuffer.clear().limit(Math.min(termLength - termOffset, byteBuffer.capacity()));
                int bytesRead = fileChannel.read(byteBuffer, termOffset);
                if (bytesRead <= 0) {
                    String msg = "read failed on " + String.valueOf(file);
                    controlSession.sendErrorResponse(correlationId, msg);
                    return -1;
                }
                int limit = bytesRead - (bytesRead & 0x1F);
                for (offset = 0; offset < limit; offset += 32) {
                    if (DataHeaderFlyweight.fragmentLength(buffer, offset) <= 0) continue;
                    found = true;
                    continue block0;
                }
            } while ((termOffset += offset) < termLength && !found);
        }
        if (termOffset >= termLength) {
            String msg = "fragment not found in first term of segment " + String.valueOf(file);
            controlSession.sendErrorResponse(correlationId, msg);
            return -1;
        }
        int fileTermId = DataHeaderFlyweight.termId(buffer, termOffset);
        if (fileTermId != termId) {
            String msg = "term id does not match: actual=" + fileTermId + " expected=" + termId;
            controlSession.sendErrorResponse(correlationId, msg);
            return -1;
        }
        int fileStreamId = DataHeaderFlyweight.streamId(buffer, termOffset);
        if (fileStreamId != streamId) {
            String msg = "stream id does not match: actual=" + fileStreamId + " expected=" + streamId;
            controlSession.sendErrorResponse(correlationId, msg);
            return -1;
        }
        return termOffset;
    }

    private int runTasks(ArrayDeque<Runnable> taskQueue) {
        Runnable runnable;
        int workCount = 0;
        while (null != (runnable = taskQueue.pollFirst())) {
            runnable.run();
            ++workCount;
        }
        return workCount;
    }

    static ChannelUriStringBuilder strippedChannelBuilder(ChannelUri channelUri) {
        return new ChannelUriStringBuilder().media(channelUri).endpoint(channelUri).networkInterface(channelUri).controlEndpoint(channelUri).controlMode(channelUri).tags(channelUri).rejoin(channelUri).group(channelUri).tether(channelUri).flowControl(channelUri).groupTag(channelUri).congestionControl(channelUri).socketRcvbufLength(channelUri).socketSndbufLength(channelUri).receiverWindowLength(channelUri).channelSendTimestampOffset(channelUri).channelReceiveTimestampOffset(channelUri).mediaReceiveTimestampOffset(channelUri).sessionId(channelUri).alias(channelUri).responseCorrelationId(channelUri).responseEndpoint(channelUri).ttl(channelUri).streamId(channelUri);
    }

    private static String makeKey(int streamId, ChannelUri channelUri) {
        String tagsStr;
        String sessionIdStr;
        String controlStr;
        String interfaceStr;
        StringBuilder sb = new StringBuilder();
        sb.append(streamId).append(':').append(channelUri.media()).append('?');
        String endpointStr = channelUri.get("endpoint");
        if (null != endpointStr) {
            sb.append("endpoint").append('=').append(endpointStr).append('|');
        }
        if (null != (interfaceStr = channelUri.get("interface"))) {
            sb.append("interface").append('=').append(interfaceStr).append('|');
        }
        if (null != (controlStr = channelUri.get("control"))) {
            sb.append("control").append('=').append(controlStr).append('|');
        }
        if (null != (sessionIdStr = channelUri.get("session-id"))) {
            sb.append("session-id").append('=').append(sessionIdStr).append('|');
        }
        if (null != (tagsStr = channelUri.get("tags"))) {
            sb.append("tags").append('=').append(tagsStr).append('|');
        }
        sb.setLength(sb.length() - 1);
        return sb.toString();
    }

    private boolean hasRecording(long recordingId, long correlationId, ControlSession session) {
        if (!this.catalog.hasRecording(recordingId)) {
            String msg = "unknown recording id: " + recordingId;
            session.sendErrorResponse(correlationId, 5L, msg);
            return false;
        }
        return true;
    }

    private boolean isDeleteAllowed(long recordingId, long correlationId, ControlSession controlSession) {
        if (this.deleteSegmentsSessionByIdMap.containsKey(recordingId)) {
            String msg = "another delete operation in progress for recording id: " + recordingId;
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        return true;
    }

    private void listSegmentFiles(long recordingId, Consumer<String> segmentFileConsumer) {
        String prefix = recordingId + "-";
        String[] recordingFiles = this.archiveDir.list();
        if (null != recordingFiles) {
            for (String name : recordingFiles) {
                if (!name.startsWith(prefix) || !name.endsWith(".rec") && !name.endsWith(DELETE_SUFFIX)) continue;
                segmentFileConsumer.accept(name);
            }
        }
    }

    private void startRecordingSession(ControlSession controlSession, long correlationId, String strippedChannel, String originalChannel, Image image, boolean autoStop) {
        int sessionId = image.sessionId();
        int streamId = image.subscription().streamId();
        String sourceIdentity = image.sourceIdentity();
        int termBufferLength = image.termBufferLength();
        int mtuLength = image.mtuLength();
        int initialTermId = image.initialTermId();
        long startPosition = image.joinPosition();
        int segmentFileLength = Math.max(this.ctx.segmentFileLength(), termBufferLength);
        long recordingId = this.catalog.addNewRecording(startPosition, this.cachedEpochClock.time(), initialTermId, segmentFileLength, termBufferLength, mtuLength, sessionId, streamId, strippedChannel, originalChannel, sourceIdentity);
        Counter position = RecordingPos.allocate(this.aeron, this.counterMetadataBuffer, this.ctx.archiveId(), recordingId, sessionId, streamId, strippedChannel, sourceIdentity);
        position.setRelease(startPosition);
        RecordingSession session = new RecordingSession(correlationId, recordingId, startPosition, segmentFileLength, originalChannel, this.recordingEventsProxy, image, position, this.ctx, controlSession, autoStop, this.recorder);
        controlSession.sendSignal(correlationId, recordingId, image.subscription().registrationId(), image.joinPosition(), RecordingSignal.START);
        this.subscriptionRefCountMap.incrementAndGet(image.subscription().registrationId());
        this.recordingSessionByIdMap.put(recordingId, session);
        this.recorder.addSession(session);
        this.ctx.recordingSessionCounter().incrementRelease();
    }

    private void extendRecordingSession(ControlSession controlSession, long correlationId, long recordingId, String strippedChannel, String originalChannel, Image image, boolean autoStop) {
        block4: {
            long subscriptionId = image.subscription().registrationId();
            try {
                if (this.recordingSessionByIdMap.containsKey(recordingId)) {
                    String msg = "cannot extend active recording " + recordingId + " streamId=" + image.subscription().streamId() + " channel=" + originalChannel;
                    controlSession.sendErrorResponse(correlationId, 2L, msg);
                    throw new ArchiveEvent(msg);
                }
                DeleteSegmentsSession deleteSegmentsSession = this.deleteSegmentsSessionByIdMap.get(recordingId);
                if (null != deleteSegmentsSession && deleteSegmentsSession.maxDeletePosition() >= this.recordingSummary.stopPosition) {
                    String msg = "cannot extend recording " + recordingId + " streamId=" + image.subscription().streamId() + " channel=" + originalChannel + " due to an outstanding delete operation";
                    controlSession.sendErrorResponse(correlationId, 0L, msg);
                    throw new ArchiveEvent(msg);
                }
                this.catalog.recordingSummary(recordingId, this.recordingSummary);
                this.validateImageForExtendRecording(correlationId, controlSession, image, this.recordingSummary);
                Counter position = RecordingPos.allocate(this.aeron, this.counterMetadataBuffer, this.ctx.archiveId(), recordingId, image.sessionId(), image.subscription().streamId(), strippedChannel, image.sourceIdentity());
                position.setRelease(image.joinPosition());
                RecordingSession session = new RecordingSession(correlationId, recordingId, this.recordingSummary.startPosition, this.recordingSummary.segmentFileLength, originalChannel, this.recordingEventsProxy, image, position, this.ctx, controlSession, autoStop, this.recorder);
                this.catalog.extendRecording(recordingId, controlSession.sessionId(), correlationId, image.sessionId());
                controlSession.sendSignal(correlationId, recordingId, subscriptionId, image.joinPosition(), RecordingSignal.EXTEND);
                this.subscriptionRefCountMap.incrementAndGet(subscriptionId);
                this.recordingSessionByIdMap.put(recordingId, session);
                this.recorder.addSession(session);
                this.ctx.recordingSessionCounter().incrementRelease();
            }
            catch (Exception ex) {
                this.errorHandler.onError(ex);
                if (!autoStop) break block4;
                this.closeAndRemoveRecordingSubscription(image.subscription(), ex.getMessage());
            }
        }
    }

    private Subscription removeRecordingSubscription(long subscriptionId) {
        Object2ObjectHashMap.ValueIterator iter = this.recordingSubscriptionByKeyMap.values().iterator();
        while (iter.hasNext()) {
            Subscription subscription = (Subscription)iter.next();
            if (subscription.registrationId() != subscriptionId) continue;
            iter.remove();
            return subscription;
        }
        return null;
    }

    private void validateImageForExtendRecording(long correlationId, ControlSession controlSession, Image image, RecordingSummary recordingSummary) {
        if (image.joinPosition() != recordingSummary.stopPosition) {
            String msg = "cannot extend recording " + recordingSummary.recordingId + " image.joinPosition=" + image.joinPosition() + " != rec.stopPosition=" + recordingSummary.stopPosition;
            controlSession.sendErrorResponse(correlationId, 9L, msg);
            throw new ArchiveEvent(msg);
        }
        if (image.initialTermId() != recordingSummary.initialTermId) {
            String msg = "cannot extend recording " + recordingSummary.recordingId + " image.initialTermId=" + image.initialTermId() + " != rec.initialTermId=" + recordingSummary.initialTermId;
            controlSession.sendErrorResponse(correlationId, 9L, msg);
            throw new ArchiveEvent(msg);
        }
        if (image.termBufferLength() != recordingSummary.termBufferLength) {
            String msg = "cannot extend recording " + recordingSummary.recordingId + " image.termBufferLength=" + image.termBufferLength() + " != rec.termBufferLength=" + recordingSummary.termBufferLength;
            controlSession.sendErrorResponse(correlationId, 9L, msg);
            throw new ArchiveEvent(msg);
        }
        if (image.mtuLength() != recordingSummary.mtuLength) {
            String msg = "cannot extend recording " + recordingSummary.recordingId + " image.mtuLength=" + image.mtuLength() + " != rec.mtuLength=" + recordingSummary.mtuLength;
            controlSession.sendErrorResponse(correlationId, 9L, msg);
            throw new ArchiveEvent(msg);
        }
    }

    private boolean isValidTruncate(long correlationId, ControlSession controlSession, long recordingId, long position) {
        for (ReplaySession replaySession : this.replaySessionByIdMap.values()) {
            if (replaySession.recordingId() != recordingId) continue;
            String msg = "cannot truncate recording with active replay " + recordingId;
            controlSession.sendErrorResponse(correlationId, 2L, msg);
            return false;
        }
        this.catalog.recordingSummary(recordingId, this.recordingSummary);
        long stopPosition = this.recordingSummary.stopPosition;
        long startPosition = this.recordingSummary.startPosition;
        if (-1L == stopPosition) {
            String msg = "cannot truncate active recording";
            controlSession.sendErrorResponse(correlationId, 2L, "cannot truncate active recording");
            return false;
        }
        if (position < startPosition || position > stopPosition || (position & 0x1FL) != 0L) {
            String msg = "invalid position " + position + ": start=" + startPosition + " stop=" + stopPosition + " alignment=32";
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        return true;
    }

    private boolean isValidPurge(long correlationId, ControlSession controlSession, long recordingId) {
        for (ReplaySession replaySession : this.replaySessionByIdMap.values()) {
            if (replaySession.recordingId() != recordingId) continue;
            String msg = "cannot purge recording with active replay " + recordingId;
            controlSession.sendErrorResponse(correlationId, 2L, msg);
            return false;
        }
        this.catalog.recordingSummary(recordingId, this.recordingSummary);
        long stopPosition = this.recordingSummary.stopPosition;
        if (-1L == stopPosition) {
            String msg = "cannot purge active recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, 2L, msg);
            return false;
        }
        return true;
    }

    private boolean isInvalidReplayPosition(long correlationId, ControlSession controlSession, long recordingId, long position, RecordingSummary recordingSummary) {
        if ((position & 0x1FL) != 0L) {
            String msg = "requested replay start position=" + position + " is not a multiple of FRAME_ALIGNMENT (32) for recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, msg);
            return true;
        }
        long startPosition = recordingSummary.startPosition;
        if (position - startPosition < 0L) {
            String msg = "requested replay start position=" + position + " is less than recording start position=" + startPosition + " for recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, msg);
            return true;
        }
        long stopPosition = recordingSummary.stopPosition;
        if (stopPosition != -1L && position >= stopPosition) {
            String msg = "requested replay start position=" + position + " must be less than highest recorded position=" + stopPosition + " for recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, msg);
            return true;
        }
        return false;
    }

    private boolean isValidDetach(long correlationId, ControlSession controlSession, long recordingId, long position) {
        this.catalog.recordingSummary(recordingId, this.recordingSummary);
        int segmentLength = this.recordingSummary.segmentFileLength;
        long startPosition = this.recordingSummary.startPosition;
        int termLength = this.recordingSummary.termBufferLength;
        long lowerBound = AeronArchive.segmentFileBasePosition(startPosition, startPosition, termLength, segmentLength) + (long)segmentLength;
        if (position != AeronArchive.segmentFileBasePosition(startPosition, position, termLength, segmentLength)) {
            String msg = "invalid segment start: newStartPosition=" + position;
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        if (position < lowerBound) {
            String msg = "invalid detach: newStartPosition=" + position + " lowerBound=" + lowerBound;
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        long stopPosition = this.recordingSummary.stopPosition;
        long endPosition = -1L == stopPosition ? this.recordingSessionByIdMap.get(recordingId).recordedPosition() : stopPosition;
        if (position > (endPosition = AeronArchive.segmentFileBasePosition(startPosition, endPosition, termLength, segmentLength))) {
            String msg = "invalid detach: in use, newStartPosition=" + position + " upperBound=" + endPosition;
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        ReplaySession minReplaySession = null;
        for (ReplaySession replaySession : this.replaySessionByIdMap.values()) {
            long replayPos = replaySession.segmentFileBasePosition();
            if (recordingId != replaySession.recordingId() || position <= replayPos || null != minReplaySession && replayPos >= minReplaySession.segmentFileBasePosition()) continue;
            minReplaySession = replaySession;
        }
        if (null != minReplaySession) {
            String msg = "invalid detach: replay in progress -  state=" + String.valueOf((Object)minReplaySession.state()) + " newStartPosition=" + position + " upperBound=" + minReplaySession.segmentFileBasePosition() + " sessionId=" + (int)minReplaySession.sessionId() + " streamId=" + minReplaySession.replayStreamId() + " channel=" + minReplaySession.replayChannel();
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        return true;
    }

    private boolean isJoinPositionSegmentUnaligned(ControlSession controlSession, long correlationId, String label, RecordingSummary recordingSummary, long seamPosition) {
        long segmentBasePosition = AeronArchive.segmentFileBasePosition(recordingSummary.startPosition, seamPosition, recordingSummary.termBufferLength, recordingSummary.segmentFileLength);
        if (segmentBasePosition != seamPosition) {
            String error = "invalid migrate: join position is not on segment boundary of " + label + " recording seamPosition=" + seamPosition + " startPosition=" + recordingSummary.startPosition + " stopPosition=" + recordingSummary.stopPosition + " termBufferLength=" + recordingSummary.termBufferLength + " segmentFileLength=" + recordingSummary.segmentFileLength;
            controlSession.sendErrorResponse(correlationId, error);
            return true;
        }
        return false;
    }

    private boolean isActiveRecording(ControlSession controlSession, long correlationId, RecordingSummary srcRecordingSummary) {
        long srcStopPosition = srcRecordingSummary.stopPosition;
        if (-1L == srcStopPosition) {
            String message = "recording " + srcRecordingSummary.recordingId + " is still active";
            controlSession.sendErrorResponse(correlationId, message);
            return true;
        }
        return false;
    }

    private boolean hasMatchingStreamParameters(ControlSession controlSession, long correlationId, RecordingSummary srcRecordingSummary, RecordingSummary dstRecordingSummary) {
        int dstSegmentFileLength = dstRecordingSummary.segmentFileLength;
        int srcSegmentFileLength = srcRecordingSummary.segmentFileLength;
        if (dstSegmentFileLength != srcSegmentFileLength) {
            String msg = "invalid migrate: srcSegmentFileLength=" + srcSegmentFileLength + " dstSegmentFileLength=" + dstSegmentFileLength;
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        int dstTermBufferLength = dstRecordingSummary.termBufferLength;
        int srcTermBufferLength = srcRecordingSummary.termBufferLength;
        if (dstTermBufferLength != srcTermBufferLength) {
            String msg = "invalid migrate: srcTermBufferLength=" + srcTermBufferLength + " dstTermBufferLength=" + dstTermBufferLength;
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        int dstInitialTermId = dstRecordingSummary.initialTermId;
        int srcInitialTermId = srcRecordingSummary.initialTermId;
        if (dstInitialTermId != srcInitialTermId) {
            String msg = "invalid migrate: srcInitialTermId=" + srcInitialTermId + " dstInitialTermId=" + dstInitialTermId;
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        int dstStreamId = dstRecordingSummary.streamId;
        int srcStreamId = srcRecordingSummary.streamId;
        if (dstStreamId != srcStreamId) {
            String msg = "invalid migrate: srcStreamId=" + srcStreamId + " dstStreamId=" + dstStreamId;
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        int dstMtuLength = dstRecordingSummary.mtuLength;
        int srcMtuLength = srcRecordingSummary.mtuLength;
        if (dstMtuLength != srcMtuLength) {
            String msg = "invalid migrate: srcMtuLength=" + srcMtuLength + " dstMtuLength=" + dstMtuLength;
            controlSession.sendErrorResponse(correlationId, msg);
            return false;
        }
        return true;
    }

    private long moveAllSegments(ControlSession controlSession, long correlationId, long srcRecordingId, long dstRecordingId, RecordingSummary srcRecordingSummary, ArrayDeque<String> emptyFollowingSrcSegment) {
        File srcFile;
        String segmentFileName;
        long position;
        long firstSegmentPos = AeronArchive.segmentFileBasePosition(srcRecordingSummary.startPosition, srcRecordingSummary.startPosition, srcRecordingSummary.termBufferLength, srcRecordingSummary.segmentFileLength);
        long lastSegmentPos = AeronArchive.segmentFileBasePosition(srcRecordingSummary.startPosition, srcRecordingSummary.stopPosition, srcRecordingSummary.termBufferLength, srcRecordingSummary.segmentFileLength);
        long attachedSegmentCount = 0L;
        int segmentLength = srcRecordingSummary.segmentFileLength;
        for (position = firstSegmentPos; position <= lastSegmentPos; position += (long)segmentLength) {
            boolean isEmptyFollowingSrcSegment;
            segmentFileName = Archive.segmentFileName(srcRecordingId, position);
            srcFile = new File(this.archiveDir, segmentFileName);
            String dstFileName = Archive.segmentFileName(dstRecordingId, position);
            File dstFile = new File(this.archiveDir, dstFileName);
            boolean bl = isEmptyFollowingSrcSegment = position == srcRecordingSummary.stopPosition;
            if (isEmptyFollowingSrcSegment) continue;
            if (!srcFile.exists()) {
                String msg = "missing src segment file " + String.valueOf(srcFile);
                controlSession.sendErrorResponse(correlationId, msg);
                return -1L;
            }
            if (!dstFile.exists()) continue;
            String msg = "preexisting dst segment file " + String.valueOf(dstFile);
            controlSession.sendErrorResponse(correlationId, msg);
            return -1L;
        }
        for (position = firstSegmentPos; position <= lastSegmentPos; position += (long)segmentLength) {
            boolean isEmptyFollowingSrcSegment;
            segmentFileName = Archive.segmentFileName(srcRecordingId, position);
            srcFile = new File(this.archiveDir, segmentFileName);
            boolean bl = isEmptyFollowingSrcSegment = position == srcRecordingSummary.stopPosition;
            if (isEmptyFollowingSrcSegment) {
                emptyFollowingSrcSegment.addFirst(segmentFileName);
                continue;
            }
            String dstFileName = Archive.segmentFileName(dstRecordingId, position);
            File dstFile = new File(this.archiveDir, dstFileName);
            if (!srcFile.renameTo(dstFile)) {
                String msg = "failed to rename " + String.valueOf(srcFile) + " to " + String.valueOf(dstFile);
                controlSession.sendErrorResponse(correlationId, msg);
                return -1L;
            }
            ++attachedSegmentCount;
        }
        return attachedSegmentCount;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private boolean eraseRemainingSegment(long correlationId, ControlSession controlSession, long position, int segmentLength, int segmentOffset, int termLength, File file) {
        try (FileChannel channel = FileChannel.open(file.toPath(), FILE_OPTIONS, new FileAttribute[0]);){
            int written;
            int termOffset = (int)(position & (long)(termLength - 1));
            int termCount = (int)(position >> LogBufferDescriptor.positionBitsToShift(termLength));
            int termId = this.recordingSummary.initialTermId + termCount;
            UnsafeBuffer dataBuffer = this.ctx.dataBuffer();
            if (ReplaySession.notHeaderAligned(channel, dataBuffer, segmentOffset, termOffset, termId, this.recordingSummary.streamId)) {
                String msg = position + " position not aligned to a data header";
                controlSession.sendErrorResponse(correlationId, msg);
                boolean bl = false;
                return bl;
            }
            channel.truncate(segmentOffset);
            dataBuffer.byteBuffer().put(0, (byte)0).limit(1).position(0);
            while (1 != (written = channel.write(dataBuffer.byteBuffer(), segmentLength - 1))) {
            }
            return true;
        }
        catch (IOException ex) {
            controlSession.sendErrorResponse(correlationId, ex.getMessage());
            LangUtil.rethrowUnchecked(ex);
        }
        return true;
    }

    private void closeAndRemoveRecordingSubscription(Subscription subscription, String reason) {
        long subscriptionId = subscription.registrationId();
        this.subscriptionRefCountMap.remove(subscriptionId);
        for (RecordingSession session : this.recordingSessionByIdMap.values()) {
            if (subscription != session.subscription()) continue;
            session.abort(reason);
        }
        this.removeRecordingSubscription(subscriptionId);
        CloseHelper.close(this.errorHandler, subscription);
    }

    private String isLowStorageSpace(long correlationId, ControlSession controlSession) {
        try {
            long threshold = this.ctx.lowStorageSpaceThreshold();
            long usableSpace = this.ctx.archiveFileStore().getUsableSpace();
            if (usableSpace <= threshold) {
                String msg = "low storage threshold=" + threshold + " <= usableSpace=" + usableSpace;
                controlSession.sendErrorResponse(correlationId, 11L, msg);
                return msg;
            }
        }
        catch (IOException ex) {
            LangUtil.rethrowUnchecked(ex);
        }
        return null;
    }

    private void deleteSegments(long correlationId, long recordingId, ControlSession controlSession, ArrayDeque<String> files) {
        int count = this.addDeleteSegmentsSession(correlationId, recordingId, controlSession, files);
        if (count >= 0) {
            controlSession.sendOkResponse(correlationId, count);
            if (0 == count) {
                controlSession.sendSignal(correlationId, recordingId, -1L, -1L, RecordingSignal.DELETE);
            }
        }
    }

    public long generateReplayToken(ControlSession session, long recordingId) {
        long replayToken = -1L;
        while (-1L == replayToken || this.controlSessionByReplayToken.containsKey(replayToken)) {
            replayToken = this.random.nextLong();
        }
        SessionForReplay sessionForReplay = new SessionForReplay(recordingId, session, this.nanoClock.nanoTime() + TimeUnit.MILLISECONDS.toNanos(this.connectTimeoutMs));
        this.controlSessionByReplayToken.put(replayToken, sessionForReplay);
        return replayToken;
    }

    public ControlSession getReplaySession(long replayToken, long recordingId) {
        SessionForReplay sessionForReplay = this.controlSessionByReplayToken.get(replayToken);
        long nowNs = this.nanoClock.nanoTime();
        if (null != sessionForReplay && recordingId == sessionForReplay.recordingId && nowNs < sessionForReplay.deadlineNs) {
            return sessionForReplay.controlSession;
        }
        return null;
    }

    void removeReplayTokensForSession(long sessionId) {
        Long2ObjectHashMap.ValueIterator it = this.controlSessionByReplayToken.values().iterator();
        while (it.hasNext()) {
            SessionForReplay sessionForReplay = (SessionForReplay)it.next();
            if (sessionForReplay.controlSession.sessionId() != sessionId) continue;
            it.remove();
        }
    }

    private int checkReplayTokens(long nowNs) {
        Long2ObjectHashMap.ValueIterator it = this.controlSessionByReplayToken.values().iterator();
        while (it.hasNext()) {
            SessionForReplay sessionForReplay = (SessionForReplay)it.next();
            if (sessionForReplay.deadlineNs > nowNs) continue;
            it.remove();
        }
        return 0;
    }

    static abstract class Recorder
    extends SessionWorker<RecordingSession> {
        private long totalWriteBytes;
        private long totalWriteTimeNs;
        private long maxWriteTimeNs;
        private final Counter totalWriteBytesCounter;
        private final Counter totalWriteTimeCounter;
        private final Counter maxWriteTimeCounter;

        Recorder(CountedErrorHandler errorHandler, Archive.Context context) {
            super("archive-recorder", errorHandler);
            this.totalWriteBytesCounter = context.totalWriteBytesCounter();
            this.totalWriteTimeCounter = context.totalWriteTimeCounter();
            this.maxWriteTimeCounter = context.maxWriteTimeCounter();
        }

        final void bytesWritten(long bytes) {
            this.totalWriteBytes += bytes;
        }

        final void writeTimeNs(long nanos) {
            this.totalWriteTimeNs += nanos;
            if (nanos > this.maxWriteTimeNs) {
                this.maxWriteTimeNs = nanos;
            }
        }

        @Override
        public int doWork() {
            int workCount = super.doWork();
            if (workCount > 0) {
                this.totalWriteBytesCounter.setRelease(this.totalWriteBytes);
                this.totalWriteTimeCounter.setRelease(this.totalWriteTimeNs);
                this.maxWriteTimeCounter.setRelease(this.maxWriteTimeNs);
            }
            return workCount;
        }
    }

    static abstract class Replayer
    extends SessionWorker<ReplaySession> {
        private long totalReadBytes;
        private long totalReadTimeNs;
        private long maxReadTimeNs;
        private final Counter totalReadBytesCounter;
        private final Counter totalReadTimeCounter;
        private final Counter maxReadTimeCounter;

        Replayer(CountedErrorHandler errorHandler, Archive.Context context) {
            super("archive-replayer", errorHandler);
            this.totalReadBytesCounter = context.totalReadBytesCounter();
            this.totalReadTimeCounter = context.totalReadTimeCounter();
            this.maxReadTimeCounter = context.maxReadTimeCounter();
        }

        final void bytesRead(long bytes) {
            this.totalReadBytes += bytes;
        }

        final void readTimeNs(long nanos) {
            this.totalReadTimeNs += nanos;
            if (nanos > this.maxReadTimeNs) {
                this.maxReadTimeNs = nanos;
            }
        }

        @Override
        public int doWork() {
            int workCount = super.doWork();
            if (workCount > 0) {
                this.totalReadBytesCounter.setRelease(this.totalReadBytes);
                this.totalReadTimeCounter.setRelease(this.totalReadTimeNs);
                this.maxReadTimeCounter.setRelease(this.maxReadTimeNs);
            }
            return workCount;
        }
    }

    private static final class SessionForReplay {
        private final long recordingId;
        private final ControlSession controlSession;
        private final long deadlineNs;

        private SessionForReplay(long recordingId, ControlSession controlSession, long deadlineNs) {
            this.recordingId = recordingId;
            this.controlSession = controlSession;
            this.deadlineNs = deadlineNs;
        }
    }
}

