/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.archive.Archive;
import io.aeron.archive.Catalog;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSession;
import io.aeron.archive.RecordingSummary;
import io.aeron.archive.Session;
import io.aeron.archive.checksum.Checksum;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.logbuffer.FrameDescriptor;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.protocol.DataHeaderFlyweight;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.EnumSet;
import org.agrona.BitUtil;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.UnsafeBuffer;

class ReplaySession
implements Session,
AutoCloseable {
    private static final EnumSet<StandardOpenOption> FILE_OPTIONS = EnumSet.of(StandardOpenOption.READ);
    private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0];
    private final long connectDeadlineMs;
    private final long correlationId;
    private final long sessionId;
    private final long recordingId;
    private final long startPosition;
    private long replayPosition;
    private long stopPosition;
    private long replayLimit;
    private volatile long segmentFileBasePosition;
    private int termBaseSegmentOffset;
    private int termOffset;
    private final int streamId;
    private final int termLength;
    private final int segmentLength;
    private final long replayBufferAddress;
    private final Checksum checksum;
    private final ExclusivePublication publication;
    private final ControlSession controlSession;
    private final CachedEpochClock epochClock;
    private final File archiveDir;
    private final Catalog catalog;
    private final Counter limitPosition;
    private final UnsafeBuffer replayBuffer;
    private FileChannel fileChannel;
    private File segmentFile;
    private State state = State.INIT;
    private String errorMessage = null;
    private volatile boolean isAborted;

    ReplaySession(long position, long length, long replaySessionId, long connectTimeoutMs, long correlationId, ControlSession controlSession, ControlResponseProxy controlResponseProxy, UnsafeBuffer replayBuffer, Catalog catalog, File archiveDir, CachedEpochClock epochClock, ExclusivePublication publication, RecordingSummary recordingSummary, Counter replayLimitPosition, Checksum checksum) {
        long currentPosition;
        long replayLength;
        this.controlSession = controlSession;
        this.sessionId = replaySessionId;
        this.correlationId = correlationId;
        this.recordingId = recordingSummary.recordingId;
        this.segmentLength = recordingSummary.segmentFileLength;
        this.termLength = recordingSummary.termBufferLength;
        this.streamId = recordingSummary.streamId;
        this.epochClock = epochClock;
        this.archiveDir = archiveDir;
        this.publication = publication;
        this.limitPosition = replayLimitPosition;
        this.replayBuffer = replayBuffer;
        this.replayBufferAddress = replayBuffer.addressOffset();
        this.catalog = catalog;
        this.checksum = checksum;
        this.startPosition = recordingSummary.startPosition;
        this.stopPosition = null == this.limitPosition ? recordingSummary.stopPosition : this.limitPosition.get();
        long fromPosition = position == -1L ? this.startPosition : position;
        long maxLength = null == this.limitPosition ? this.stopPosition - fromPosition : Long.MAX_VALUE - fromPosition;
        long l = replayLength = length == -1L ? maxLength : Math.min(length, maxLength);
        if (replayLength < 0L) {
            this.close();
            String msg = "replay recording " + this.recordingId + " - length must be positive: " + replayLength;
            controlSession.attemptErrorResponse(correlationId, msg, controlResponseProxy);
            throw new ArchiveException(msg);
        }
        if (null != this.limitPosition && (currentPosition = this.limitPosition.get()) < fromPosition) {
            this.close();
            String msg = "replay recording " + this.recordingId + " - " + fromPosition + " after current position of " + currentPosition;
            controlSession.attemptErrorResponse(correlationId, msg, controlResponseProxy);
            throw new ArchiveException(msg);
        }
        this.segmentFileBasePosition = AeronArchive.segmentFileBasePosition(this.startPosition, fromPosition, this.termLength, this.segmentLength);
        this.replayPosition = fromPosition;
        this.replayLimit = fromPosition + replayLength;
        String segmentFileName = Archive.segmentFileName(this.recordingId, this.segmentFileBasePosition);
        this.segmentFile = new File(archiveDir, segmentFileName);
        controlSession.sendOkResponse(correlationId, replaySessionId, controlResponseProxy);
        this.connectDeadlineMs = epochClock.time() + connectTimeoutMs;
    }

    @Override
    public void close() {
        CountedErrorHandler errorHandler = this.controlSession.archiveConductor().context().countedErrorHandler();
        CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.publication);
        CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.fileChannel);
    }

    @Override
    public long sessionId() {
        return this.sessionId;
    }

    @Override
    public int doWork() {
        int workCount = 0;
        if (this.isAborted) {
            this.state(State.INACTIVE);
        }
        try {
            if (State.INIT == this.state) {
                workCount += this.init();
            }
            if (State.REPLAY == this.state) {
                workCount += this.replay();
            }
        }
        catch (IOException ex) {
            this.onError("IOException - " + ex.getMessage() + " - " + this.segmentFile.getName());
            LangUtil.rethrowUnchecked((Throwable)ex);
        }
        if (State.INACTIVE == this.state) {
            this.closeRecordingSegment();
            this.state(State.DONE);
        }
        return workCount;
    }

    @Override
    public void abort() {
        this.isAborted = true;
    }

    @Override
    public boolean isDone() {
        return this.state == State.DONE;
    }

    long recordingId() {
        return this.recordingId;
    }

    State state() {
        return this.state;
    }

    long segmentFileBasePosition() {
        return this.segmentFileBasePosition;
    }

    Counter limitPosition() {
        return this.limitPosition;
    }

    void sendPendingError(ControlResponseProxy controlResponseProxy) {
        if (null != this.errorMessage && !this.controlSession.isDone()) {
            this.onPendingError(this.sessionId, this.recordingId, this.errorMessage);
            this.controlSession.attemptErrorResponse(this.correlationId, this.errorMessage, controlResponseProxy);
        }
    }

    void onPendingError(long sessionId, long recordingId, String errorMessage) {
    }

    private int init() throws IOException {
        if (null == this.fileChannel) {
            if (!this.segmentFile.exists()) {
                if (this.epochClock.time() > this.connectDeadlineMs) {
                    this.onError("recording segment not found " + this.segmentFile);
                }
                return 0;
            }
            long startTermBasePosition = this.startPosition - (this.startPosition & (long)(this.termLength - 1));
            int segmentOffset = (int)(this.replayPosition - startTermBasePosition & (long)(this.segmentLength - 1));
            int termId = LogBufferDescriptor.computeTermIdFromPosition((long)this.replayPosition, (int)this.publication.positionBitsToShift(), (int)this.publication.initialTermId());
            this.openRecordingSegment();
            this.termOffset = (int)(this.replayPosition & (long)(this.termLength - 1));
            this.termBaseSegmentOffset = segmentOffset - this.termOffset;
            if (this.replayPosition > this.startPosition && this.replayPosition != this.stopPosition && ReplaySession.notHeaderAligned(this.fileChannel, this.replayBuffer, segmentOffset, this.termOffset, termId, this.streamId)) {
                this.onError(this.replayPosition + " position not aligned to a data header");
                return 0;
            }
        }
        if (!this.publication.isConnected()) {
            if (this.epochClock.time() > this.connectDeadlineMs) {
                this.onError("no connection established for replay");
            }
            return 0;
        }
        this.state(State.REPLAY);
        return 1;
    }

    private int replay() throws IOException {
        if (!this.publication.isConnected()) {
            this.state(State.INACTIVE);
            return 0;
        }
        if (this.replayPosition >= this.stopPosition && null != this.limitPosition && this.noNewData(this.replayPosition, this.stopPosition)) {
            return 0;
        }
        if (this.termOffset == this.termLength) {
            this.nextTerm();
        }
        int workCount = 0;
        int bytesRead = this.readRecording(this.stopPosition - this.replayPosition);
        if (bytesRead > 0) {
            long position;
            int batchOffset = 0;
            int paddingFrameLength = 0;
            int sessionId = this.publication.sessionId();
            int streamId = this.publication.streamId();
            long remaining = this.replayLimit - this.replayPosition;
            Checksum checksum = this.checksum;
            UnsafeBuffer replayBuffer = this.replayBuffer;
            while (batchOffset < bytesRead && (long)batchOffset < remaining) {
                int frameLength = FrameDescriptor.frameLength((UnsafeBuffer)replayBuffer, (int)batchOffset);
                if (frameLength <= 0) {
                    this.raiseError(frameLength, bytesRead, batchOffset, remaining);
                }
                int frameType = FrameDescriptor.frameType((UnsafeBuffer)replayBuffer, (int)batchOffset);
                int alignedLength = BitUtil.align((int)frameLength, (int)32);
                if (1 == frameType) {
                    if (batchOffset + alignedLength > bytesRead) break;
                    if (null != checksum) {
                        this.verifyChecksum(checksum, batchOffset, alignedLength);
                    }
                    replayBuffer.putInt(batchOffset + 12, sessionId, ByteOrder.LITTLE_ENDIAN);
                    replayBuffer.putInt(batchOffset + 16, streamId, ByteOrder.LITTLE_ENDIAN);
                    batchOffset += alignedLength;
                    continue;
                }
                if (0 != frameType) continue;
                paddingFrameLength = frameLength;
                break;
            }
            if (batchOffset > 0) {
                long position2 = this.publication.offerBlock((MutableDirectBuffer)replayBuffer, 0, batchOffset);
                if (this.hasPublicationAdvanced(position2, batchOffset)) {
                    ++workCount;
                } else {
                    paddingFrameLength = 0;
                }
            }
            if (paddingFrameLength > 0 && this.hasPublicationAdvanced(position = this.publication.appendPadding(paddingFrameLength - 32), BitUtil.align((int)paddingFrameLength, (int)32))) {
                ++workCount;
            }
        }
        return workCount;
    }

    private void raiseError(int frameLength, int bytesRead, int batchOffset, long remaining) {
        throw new IllegalStateException("unexpected end of recording " + this.recordingId + " frameLength=" + frameLength + " replayPosition=" + this.replayPosition + " remaining=" + remaining + " limitPosition=" + this.limitPosition + " batchOffset=" + batchOffset + " bytesRead=" + bytesRead);
    }

    private boolean hasPublicationAdvanced(long position, int alignedLength) {
        if (position > 0L) {
            this.termOffset += alignedLength;
            this.replayPosition += (long)alignedLength;
            if (this.replayPosition >= this.replayLimit) {
                this.state(State.INACTIVE);
            }
            return true;
        }
        if (-4L == position || -1L == position) {
            this.onError("stream closed before replay complete");
        }
        return false;
    }

    private void verifyChecksum(Checksum checksum, int frameOffset, int alignedLength) {
        int recordedChecksum;
        int computedChecksum = checksum.compute(this.replayBufferAddress, frameOffset + 32, alignedLength - 32);
        if (computedChecksum != (recordedChecksum = FrameDescriptor.frameSessionId((UnsafeBuffer)this.replayBuffer, (int)frameOffset))) {
            String message = "CRC checksum mismatch at offset=" + frameOffset + ": recorded checksum=" + recordedChecksum + ", computed checksum=" + computedChecksum;
            throw new ArchiveException(message);
        }
    }

    private int readRecording(long availableReplay) throws IOException {
        if (this.publication.availableWindow() > 0L) {
            int bytesRead;
            int limit = Math.min((int)Math.min(availableReplay, (long)this.replayBuffer.capacity()), this.termLength - this.termOffset);
            ByteBuffer byteBuffer = this.replayBuffer.byteBuffer();
            byteBuffer.clear().limit(limit);
            int position = this.termBaseSegmentOffset + this.termOffset;
            while ((bytesRead = this.fileChannel.read(byteBuffer, position)) > 0) {
                position += bytesRead;
                if (byteBuffer.remaining() > 0) continue;
            }
            return byteBuffer.limit();
        }
        return 0;
    }

    private void onError(String errorMessage) {
        this.errorMessage = errorMessage;
        this.state(State.INACTIVE);
    }

    private boolean noNewData(long replayPosition, long oldStopPosition) {
        long newStopPosition;
        Counter limitPosition = this.limitPosition;
        long currentLimitPosition = limitPosition.get();
        boolean isCounterClosed = limitPosition.isClosed();
        long l = newStopPosition = isCounterClosed ? this.catalog.stopPosition(this.recordingId) : currentLimitPosition;
        if (isCounterClosed) {
            if (-1L == newStopPosition) {
                this.replayLimit = oldStopPosition;
            } else if (newStopPosition < this.replayLimit) {
                this.replayLimit = newStopPosition;
            }
        }
        if (replayPosition >= this.replayLimit) {
            this.state(State.INACTIVE);
        } else if (newStopPosition > oldStopPosition) {
            this.stopPosition = newStopPosition;
            return false;
        }
        return true;
    }

    private void nextTerm() throws IOException {
        this.termOffset = 0;
        this.termBaseSegmentOffset += this.termLength;
        if (this.termBaseSegmentOffset == this.segmentLength) {
            this.closeRecordingSegment();
            this.segmentFileBasePosition += (long)this.segmentLength;
            this.openRecordingSegment();
            this.termBaseSegmentOffset = 0;
        }
    }

    private void closeRecordingSegment() {
        CloseHelper.close((AutoCloseable)this.fileChannel);
        this.fileChannel = null;
        this.segmentFile = null;
    }

    private void openRecordingSegment() throws IOException {
        if (null == this.segmentFile) {
            String segmentFileName = Archive.segmentFileName(this.recordingId, this.segmentFileBasePosition);
            this.segmentFile = new File(this.archiveDir, segmentFileName);
            if (!this.segmentFile.exists()) {
                String msg = "recording segment not found " + segmentFileName;
                this.onError(msg);
                throw new ArchiveException(msg);
            }
        }
        this.fileChannel = FileChannel.open(this.segmentFile.toPath(), FILE_OPTIONS, NO_ATTRIBUTES);
    }

    static boolean notHeaderAligned(FileChannel channel, UnsafeBuffer buffer, int segmentOffset, int termOffset, int termId, int streamId) throws IOException {
        ByteBuffer byteBuffer = buffer.byteBuffer();
        byteBuffer.clear().limit(32);
        if (32 != channel.read(byteBuffer, segmentOffset)) {
            throw new ArchiveException("failed to read fragment header");
        }
        return ReplaySession.isInvalidHeader(buffer, streamId, termId, termOffset);
    }

    private void state(State newState) {
        this.state = newState;
    }

    static boolean isInvalidHeader(UnsafeBuffer buffer, int streamId, int termId, int termOffset) {
        return DataHeaderFlyweight.termOffset((UnsafeBuffer)buffer, (int)0) != termOffset || DataHeaderFlyweight.termId((UnsafeBuffer)buffer, (int)0) != termId || DataHeaderFlyweight.streamId((UnsafeBuffer)buffer, (int)0) != streamId;
    }

    static enum State {
        INIT,
        REPLAY,
        INACTIVE,
        DONE;

    }
}

