/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchiveConductor;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSession;
import io.aeron.archive.Session;
import io.aeron.archive.checksum.Checksum;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.logbuffer.FrameDescriptor;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.protocol.DataHeaderFlyweight;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.EnumSet;
import org.agrona.BitUtil;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.CountersReader;

class ReplaySession
implements Session,
AutoCloseable {
    private static final EnumSet<StandardOpenOption> FILE_OPTIONS = EnumSet.of(StandardOpenOption.READ);
    private final long connectDeadlineMs;
    private final long correlationId;
    private final long sessionId;
    private final long recordingId;
    private final long startPosition;
    private long replayPosition;
    private long stopPosition;
    private long replayLimit;
    private volatile long segmentFileBasePosition;
    private int termBaseSegmentOffset;
    private int termOffset;
    private final int streamId;
    private final int termLength;
    private final int segmentLength;
    private final long replayBufferAddress;
    private final Checksum checksum;
    private final ExclusivePublication publication;
    private final ControlSession controlSession;
    private final CachedEpochClock epochClock;
    private final NanoClock nanoClock;
    final ArchiveConductor.Replayer replayer;
    private final File archiveDir;
    private final CountersReader countersReader;
    private final Counter limitPosition;
    private final UnsafeBuffer replayBuffer;
    private FileChannel fileChannel;
    private File segmentFile;
    private State state = State.INIT;
    private String errorMessage = null;
    private volatile boolean isAborted;

    ReplaySession(long correlationId, long recordingId, long replayPosition, long replayLength, long startPosition, long stopPosition, int segmentFileLength, int termBufferLength, int streamId, long replaySessionId, long connectTimeoutMs, ControlSession controlSession, UnsafeBuffer replayBuffer, File archiveDir, CachedEpochClock epochClock, NanoClock nanoClock, ExclusivePublication publication, CountersReader countersReader, Counter replayLimitPosition, Checksum checksum, ArchiveConductor.Replayer replayer) {
        this.controlSession = controlSession;
        this.sessionId = replaySessionId;
        this.correlationId = correlationId;
        this.recordingId = recordingId;
        this.segmentLength = segmentFileLength;
        this.termLength = termBufferLength;
        this.streamId = streamId;
        this.epochClock = epochClock;
        this.nanoClock = nanoClock;
        this.archiveDir = archiveDir;
        this.publication = publication;
        this.countersReader = countersReader;
        this.limitPosition = replayLimitPosition;
        this.replayBuffer = replayBuffer;
        this.replayBufferAddress = replayBuffer.addressOffset();
        this.checksum = checksum;
        this.startPosition = startPosition;
        this.stopPosition = stopPosition;
        this.replayer = replayer;
        this.segmentFileBasePosition = AeronArchive.segmentFileBasePosition(startPosition, replayPosition, this.termLength, this.segmentLength);
        this.replayPosition = replayPosition;
        this.replayLimit = replayPosition + replayLength;
        this.segmentFile = new File(archiveDir, Archive.segmentFileName(recordingId, this.segmentFileBasePosition));
        this.connectDeadlineMs = epochClock.time() + connectTimeoutMs;
    }

    @Override
    public void close() {
        CountedErrorHandler errorHandler = this.controlSession.archiveConductor().context().countedErrorHandler();
        CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.publication);
        CloseHelper.close((ErrorHandler)errorHandler, (AutoCloseable)this.fileChannel);
    }

    @Override
    public long sessionId() {
        return this.sessionId;
    }

    @Override
    public int doWork() {
        int workCount = 0;
        if (this.isAborted) {
            this.state(State.INACTIVE, "replay aborted");
        }
        try {
            if (State.INIT == this.state) {
                workCount += this.init();
            }
            if (State.REPLAY == this.state) {
                workCount += this.replay();
            }
        }
        catch (IOException ex) {
            this.onError("IOException - " + ex.getMessage() + " - " + this.segmentFile.getName());
            LangUtil.rethrowUnchecked((Throwable)ex);
        }
        if (State.INACTIVE == this.state) {
            this.closeRecordingSegment();
            this.state(State.DONE, "");
        }
        return workCount;
    }

    @Override
    public void abort() {
        this.isAborted = true;
    }

    @Override
    public boolean isDone() {
        return this.state == State.DONE;
    }

    long recordingId() {
        return this.recordingId;
    }

    State state() {
        return this.state;
    }

    String replayChannel() {
        return this.publication.channel();
    }

    int replayStreamId() {
        return this.publication.streamId();
    }

    long segmentFileBasePosition() {
        return this.segmentFileBasePosition;
    }

    void sendPendingError(ControlResponseProxy controlResponseProxy) {
        if (null != this.errorMessage && !this.controlSession.isDone()) {
            this.onPendingError(this.sessionId, this.recordingId, this.errorMessage);
            this.controlSession.attemptErrorResponse(this.correlationId, this.errorMessage, controlResponseProxy);
        }
    }

    void onPendingError(long sessionId, long recordingId, String errorMessage) {
    }

    private int init() throws IOException {
        if (null == this.fileChannel) {
            if (!this.segmentFile.exists()) {
                if (this.epochClock.time() > this.connectDeadlineMs) {
                    this.onError("recording segment not found " + this.segmentFile);
                }
                return 0;
            }
            long startTermBasePosition = this.startPosition - (this.startPosition & (long)(this.termLength - 1));
            int segmentOffset = (int)(this.replayPosition - startTermBasePosition & (long)(this.segmentLength - 1));
            int termId = LogBufferDescriptor.computeTermIdFromPosition((long)this.replayPosition, (int)this.publication.positionBitsToShift(), (int)this.publication.initialTermId());
            this.openRecordingSegment();
            this.termOffset = (int)(this.replayPosition & (long)(this.termLength - 1));
            this.termBaseSegmentOffset = segmentOffset - this.termOffset;
            if (this.replayPosition > this.startPosition && this.replayPosition != this.stopPosition && ReplaySession.notHeaderAligned(this.fileChannel, this.replayBuffer, segmentOffset, this.termOffset, termId, this.streamId)) {
                this.onError(this.replayPosition + " position not aligned to a data header");
                return 0;
            }
            this.controlSession.asyncSendReplayOkResponse(this.correlationId, this.sessionId);
        }
        if (!this.publication.isConnected()) {
            if (this.epochClock.time() > this.connectDeadlineMs) {
                this.onError("no connection established for replay");
            }
            return 0;
        }
        this.state(State.REPLAY, "");
        return 1;
    }

    private int replay() throws IOException {
        if (!this.publication.isConnected()) {
            this.state(State.INACTIVE, "publication is not connected");
            return 0;
        }
        if (null != this.limitPosition && this.replayPosition >= this.stopPosition && this.notExtended(this.replayPosition, this.stopPosition)) {
            return 0;
        }
        if (this.termOffset == this.termLength) {
            this.nextTerm();
        }
        int workCount = 0;
        if (this.publication.availableWindow() > 0L) {
            long startNs = this.nanoClock.nanoTime();
            int bytesRead = this.readRecording(this.stopPosition - this.replayPosition);
            if (bytesRead > 0) {
                long position;
                int sessionId = this.publication.sessionId();
                int streamId = this.publication.streamId();
                int remaining = (int)Math.min(this.replayLimit - this.replayPosition, 0x40000000L);
                int batchOffset = 0;
                int paddingFrameLength = 0;
                while (batchOffset < bytesRead && batchOffset < remaining) {
                    int frameLength = FrameDescriptor.frameLength((UnsafeBuffer)this.replayBuffer, (int)batchOffset);
                    if (frameLength <= 0) {
                        this.raiseError(frameLength, bytesRead, batchOffset, remaining);
                    }
                    int frameType = FrameDescriptor.frameType((UnsafeBuffer)this.replayBuffer, (int)batchOffset);
                    int alignedLength = BitUtil.align((int)frameLength, (int)32);
                    if (1 == frameType) {
                        if (batchOffset + alignedLength > bytesRead) break;
                        if (null != this.checksum) {
                            this.verifyChecksum(this.checksum, batchOffset, alignedLength);
                        }
                        this.replayBuffer.putInt(batchOffset + 12, sessionId, ByteOrder.LITTLE_ENDIAN);
                        this.replayBuffer.putInt(batchOffset + 16, streamId, ByteOrder.LITTLE_ENDIAN);
                        batchOffset += alignedLength;
                        continue;
                    }
                    if (0 != frameType) continue;
                    paddingFrameLength = frameLength;
                    break;
                }
                long readTimeNs = this.nanoClock.nanoTime() - startNs;
                this.replayer.bytesRead(bytesRead);
                this.replayer.readTimeNs(readTimeNs);
                if (batchOffset > 0) {
                    long position2 = this.publication.offerBlock((MutableDirectBuffer)this.replayBuffer, 0, batchOffset);
                    if (this.hasPublicationAdvanced(position2, batchOffset)) {
                        ++workCount;
                    } else {
                        paddingFrameLength = 0;
                    }
                }
                if (paddingFrameLength > 0 && this.hasPublicationAdvanced(position = this.publication.appendPadding(paddingFrameLength - 32), BitUtil.align((int)paddingFrameLength, (int)32))) {
                    ++workCount;
                }
            }
        }
        return workCount;
    }

    private void raiseError(int frameLength, int bytesRead, int batchOffset, long remaining) {
        throw new IllegalStateException("unexpected end of recording " + this.recordingId + " frameLength=" + frameLength + " replayPosition=" + this.replayPosition + " remaining=" + remaining + " limitPosition=" + this.limitPosition + " batchOffset=" + batchOffset + " bytesRead=" + bytesRead);
    }

    private boolean hasPublicationAdvanced(long position, int alignedLength) {
        if (position > 0L) {
            this.termOffset += alignedLength;
            this.replayPosition += (long)alignedLength;
            if (this.replayPosition >= this.replayLimit) {
                this.state(State.INACTIVE, "position (" + this.replayPosition + ") past limit (" + this.replayLimit + ")");
            }
            return true;
        }
        if (-4L == position || -1L == position) {
            this.onError("stream closed before replay complete");
        }
        return false;
    }

    private void verifyChecksum(Checksum checksum, int frameOffset, int alignedLength) {
        int recordedChecksum;
        int computedChecksum = checksum.compute(this.replayBufferAddress, frameOffset + 32, alignedLength - 32);
        if (computedChecksum != (recordedChecksum = FrameDescriptor.frameSessionId((UnsafeBuffer)this.replayBuffer, (int)frameOffset))) {
            String message = "CRC checksum mismatch at offset=" + frameOffset + ": recorded checksum=" + recordedChecksum + ", computed checksum=" + computedChecksum;
            throw new ArchiveException(message);
        }
    }

    private int readRecording(long availableReplay) throws IOException {
        int bytesRead;
        int limit = Math.min((int)Math.min(availableReplay, (long)this.replayBuffer.capacity()), this.termLength - this.termOffset);
        ByteBuffer byteBuffer = this.replayBuffer.byteBuffer();
        byteBuffer.clear().limit(limit);
        int position = this.termBaseSegmentOffset + this.termOffset;
        while ((bytesRead = this.fileChannel.read(byteBuffer, position)) > 0) {
            position += bytesRead;
            if (byteBuffer.remaining() > 0) continue;
        }
        return limit;
    }

    private void onError(String errorMessage) {
        this.errorMessage = errorMessage + ", recordingId=" + this.recordingId + ", sessionId=" + this.sessionId;
        this.state(State.INACTIVE, errorMessage);
    }

    private boolean notExtended(long replayPosition, long oldStopPosition) {
        Counter limitPosition = this.limitPosition;
        long currentLimitPosition = limitPosition.get();
        long newStopPosition = oldStopPosition;
        if (limitPosition.isClosed()) {
            if (this.countersReader.getCounterRegistrationId(limitPosition.id()) == limitPosition.registrationId()) {
                this.replayLimit = currentLimitPosition;
                newStopPosition = Math.max(oldStopPosition, currentLimitPosition);
            } else if (this.replayLimit >= oldStopPosition) {
                this.replayLimit = oldStopPosition;
            }
        } else {
            newStopPosition = currentLimitPosition;
        }
        if (replayPosition >= this.replayLimit) {
            this.state(State.INACTIVE, "position (" + replayPosition + ") past limit (" + this.replayLimit + ") (notExtended)");
        } else if (newStopPosition > oldStopPosition) {
            this.stopPosition = newStopPosition;
            return false;
        }
        return true;
    }

    private void nextTerm() throws IOException {
        this.termOffset = 0;
        this.termBaseSegmentOffset += this.termLength;
        if (this.termBaseSegmentOffset == this.segmentLength) {
            this.closeRecordingSegment();
            this.segmentFileBasePosition += (long)this.segmentLength;
            this.openRecordingSegment();
            this.termBaseSegmentOffset = 0;
        }
    }

    private void closeRecordingSegment() {
        CloseHelper.close((AutoCloseable)this.fileChannel);
        this.fileChannel = null;
        this.segmentFile = null;
    }

    private void openRecordingSegment() throws IOException {
        if (null == this.segmentFile) {
            String segmentFileName = Archive.segmentFileName(this.recordingId, this.segmentFileBasePosition);
            this.segmentFile = new File(this.archiveDir, segmentFileName);
            if (!this.segmentFile.exists()) {
                String msg = "recording segment not found " + segmentFileName;
                this.onError(msg);
                throw new ArchiveException(msg);
            }
        }
        this.fileChannel = FileChannel.open(this.segmentFile.toPath(), FILE_OPTIONS, new FileAttribute[0]);
    }

    static boolean notHeaderAligned(FileChannel channel, UnsafeBuffer buffer, int segmentOffset, int termOffset, int termId, int streamId) throws IOException {
        ByteBuffer byteBuffer = buffer.byteBuffer();
        byteBuffer.clear().limit(32);
        if (32 != channel.read(byteBuffer, segmentOffset)) {
            throw new ArchiveException("failed to read fragment header");
        }
        return ReplaySession.isInvalidHeader(buffer, streamId, termId, termOffset);
    }

    private void state(State newState, String reason) {
        this.logStateChange(this.state, newState, this.sessionId, this.recordingId, this.replayPosition, null == reason ? "" : reason);
        this.state = newState;
    }

    private void logStateChange(State oldState, State newState, long sessionId, long recordingId, long position, String reason) {
    }

    static boolean isInvalidHeader(UnsafeBuffer buffer, int streamId, int termId, int termOffset) {
        return DataHeaderFlyweight.termOffset((UnsafeBuffer)buffer, (int)0) != termOffset || DataHeaderFlyweight.termId((UnsafeBuffer)buffer, (int)0) != termId || DataHeaderFlyweight.streamId((UnsafeBuffer)buffer, (int)0) != streamId;
    }

    static enum State {
        INIT,
        REPLAY,
        INACTIVE,
        DONE;

    }
}

