/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.Aeron;
import io.aeron.ExclusivePublication;
import io.aeron.Subscription;
import io.aeron.archive.ArchiveConductor;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSessionAdapter;
import io.aeron.archive.ControlSessionProxy;
import io.aeron.archive.Session;
import io.aeron.archive.client.ArchiveEvent;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.RecordingSignal;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.security.Authenticator;
import java.util.ArrayDeque;
import java.util.function.BooleanSupplier;
import org.agrona.CloseHelper;
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue;
import org.agrona.concurrent.UnsafeBuffer;

final class ControlSession
implements Session {
    static final String SESSION_CLOSED_MSG = "session closed";
    private static final long RESEND_INTERVAL_MS = 200L;
    private static final String SESSION_REJECTED_MSG = "authentication rejected";
    private final Thread conductorThread;
    private long sessionLivenessCheckDeadlineMs;
    private String abortReason;
    private final long controlSessionId;
    private final long connectTimeoutMs;
    private final long sessionLivenessCheckIntervalMs;
    private final long controlPublicationId;
    private long correlationId;
    private long resendDeadlineMs;
    private long activityDeadlineMs;
    private Session activeListing = null;
    private ExclusivePublication controlPublication;
    private byte[] encodedPrincipal;
    private final Aeron aeron;
    private final ArchiveConductor conductor;
    private final CachedEpochClock cachedEpochClock;
    private final ControlResponseProxy controlResponseProxy;
    private final Authenticator authenticator;
    private final ControlSessionProxy controlSessionProxy;
    private final ArrayDeque<BooleanSupplier> syncResponseQueue = new ArrayDeque(8);
    private final ManyToOneConcurrentLinkedQueue<BooleanSupplier> asyncResponseQueue = new ManyToOneConcurrentLinkedQueue();
    private final ControlSessionAdapter controlSessionAdapter;
    private final String invalidVersionMessage;
    private State state = State.INIT;

    ControlSession(long controlSessionId, long correlationId, long connectTimeoutMs, long sessionLivenessCheckIntervalMs, long controlPublicationId, String invalidVersionMessage, ControlSessionAdapter controlSessionAdapter, Aeron aeron, ArchiveConductor conductor, CachedEpochClock cachedEpochClock, ControlResponseProxy controlResponseProxy, Authenticator authenticator, ControlSessionProxy controlSessionProxy) {
        this.controlSessionId = controlSessionId;
        this.correlationId = correlationId;
        this.connectTimeoutMs = connectTimeoutMs;
        this.sessionLivenessCheckIntervalMs = sessionLivenessCheckIntervalMs;
        this.invalidVersionMessage = invalidVersionMessage;
        this.controlSessionAdapter = controlSessionAdapter;
        this.aeron = aeron;
        this.controlPublicationId = controlPublicationId;
        this.conductor = conductor;
        this.cachedEpochClock = cachedEpochClock;
        this.controlResponseProxy = controlResponseProxy;
        this.authenticator = authenticator;
        this.controlSessionProxy = controlSessionProxy;
        this.activityDeadlineMs = cachedEpochClock.time() + connectTimeoutMs;
        this.sessionLivenessCheckDeadlineMs = cachedEpochClock.time() + sessionLivenessCheckIntervalMs;
        this.conductorThread = Thread.currentThread();
    }

    @Override
    public long sessionId() {
        return this.controlSessionId;
    }

    @Override
    public void abort(String reason) {
        this.abortReason = reason;
        this.state(State.DONE, reason);
        if (null != this.activeListing) {
            this.activeListing.abort(reason);
        }
    }

    @Override
    public void close() {
        if (null != this.activeListing) {
            this.activeListing.abort(this.abortReason);
        }
        if (null == this.controlPublication) {
            this.aeron.asyncRemovePublication(this.controlPublicationId);
        } else {
            CloseHelper.close(this.conductor.context().countedErrorHandler(), this.controlPublication);
        }
        this.controlSessionAdapter.removeControlSession(this.controlSessionId);
        if (!this.conductor.context().controlSessionsCounter().isClosed()) {
            this.conductor.context().controlSessionsCounter().decrementOrdered();
        }
        if (null != this.abortReason && !SESSION_CLOSED_MSG.equals(this.abortReason)) {
            this.conductor.errorHandler.onError(new ArchiveEvent("controlSessionId=" + this.controlSessionId + " terminated: " + this.abortReason));
        }
    }

    @Override
    public boolean isDone() {
        return this.state == State.DONE;
    }

    @Override
    public int doWork() {
        int workCount = 0;
        long nowMs = this.cachedEpochClock.time();
        if (this.hasNoActivity(nowMs)) {
            this.abortReason = State.ACTIVE == this.state ? "failed to send response for more than connectTimeoutMs=" + this.connectTimeoutMs : "failed to establish initial connection: state=" + String.valueOf((Object)this.state);
            this.state(State.INACTIVE, this.abortReason);
            ++workCount;
        }
        switch (this.state) {
            case INIT: {
                workCount += this.waitForPublication(nowMs);
                break;
            }
            case CONNECTING: {
                workCount += this.waitForConnection(nowMs);
                break;
            }
            case CONNECTED: {
                workCount += this.sendConnectResponse(nowMs);
                break;
            }
            case CHALLENGED: {
                workCount += this.waitForChallengeResponse(nowMs);
                break;
            }
            case AUTHENTICATED: {
                workCount += this.waitForRequest(nowMs);
                break;
            }
            case ACTIVE: {
                if (this.sessionLivenessCheckDeadlineMs - nowMs < 0L) {
                    this.sessionLivenessCheckDeadlineMs = nowMs + this.sessionLivenessCheckIntervalMs;
                    this.sendOkResponse(-1L, -1L);
                    ++workCount;
                }
                workCount += this.sendResponses(nowMs);
                break;
            }
            case REJECTED: {
                workCount += this.sendReject(nowMs);
                break;
            }
            case INACTIVE: {
                this.state(State.DONE, "inactive");
                break;
            }
        }
        return workCount;
    }

    byte[] encodedPrincipal() {
        return this.encodedPrincipal;
    }

    long correlationId() {
        return this.correlationId;
    }

    State state() {
        return this.state;
    }

    ArchiveConductor archiveConductor() {
        return this.conductor;
    }

    ExclusivePublication controlPublication() {
        return this.controlPublication;
    }

    boolean hasActiveListing() {
        return null != this.activeListing;
    }

    void activeListing(Session activeListing) {
        this.activeListing = activeListing;
    }

    void onChallengeResponse(long correlationId, byte[] encodedCredentials) {
        if (State.CHALLENGED == this.state) {
            this.correlationId = correlationId;
            this.authenticator.onChallengeResponse(this.controlSessionId, encodedCredentials, this.cachedEpochClock.time());
        }
    }

    void onKeepAlive(long correlationId) {
        this.attemptToActivate();
    }

    void onStopRecording(long correlationId, int streamId, String channel) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.stopRecording(correlationId, streamId, channel, this);
        }
    }

    void onStopRecordingSubscription(long correlationId, long subscriptionId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.stopRecordingSubscription(correlationId, subscriptionId, this);
        }
    }

    void onStartRecording(long correlationId, int streamId, SourceLocation sourceLocation, boolean autoStop, String channel) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.startRecording(correlationId, streamId, sourceLocation, autoStop, channel, this);
        }
    }

    void onListRecordingsForUri(long correlationId, long fromRecordingId, int recordCount, int streamId, byte[] channelFragment) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.newListRecordingsForUriSession(correlationId, fromRecordingId, recordCount, streamId, channelFragment, this);
        }
    }

    void onListRecordings(long correlationId, long fromRecordingId, int recordCount) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.newListRecordingsSession(correlationId, fromRecordingId, recordCount, this);
        }
    }

    void onListRecording(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.listRecording(correlationId, recordingId, this);
        }
    }

    void onFindLastMatchingRecording(long correlationId, long minRecordingId, int sessionId, int streamId, byte[] channelFragment) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.findLastMatchingRecording(correlationId, minRecordingId, sessionId, streamId, channelFragment, this);
        }
    }

    void onStartReplay(long correlationId, long recordingId, long position, long length, int fileIoMaxLength, int replayStreamId, String replayChannel) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.startReplay(correlationId, recordingId, position, length, fileIoMaxLength, replayStreamId, replayChannel, null, this);
        }
    }

    void onStartBoundedReplay(long correlationId, long recordingId, long position, long length, int limitCounterId, int fileIoMaxLength, int replayStreamId, String replayChannel) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.startBoundedReplay(correlationId, recordingId, position, length, limitCounterId, fileIoMaxLength, replayStreamId, replayChannel, this);
        }
    }

    void onStopReplay(long correlationId, long replaySessionId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.stopReplay(correlationId, replaySessionId, this);
        }
    }

    void onStopAllReplays(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.stopAllReplays(correlationId, recordingId, this);
        }
    }

    void onExtendRecording(long correlationId, long recordingId, int streamId, SourceLocation sourceLocation, boolean autoStop, String channel) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.extendRecording(correlationId, recordingId, streamId, sourceLocation, autoStop, channel, this);
        }
    }

    void onGetRecordingPosition(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.getRecordingPosition(correlationId, recordingId, this);
        }
    }

    void onTruncateRecording(long correlationId, long recordingId, long position) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.truncateRecording(correlationId, recordingId, position, this);
        }
    }

    void onPurgeRecording(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.purgeRecording(correlationId, recordingId, this);
        }
    }

    void onGetStopPosition(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.getStopPosition(correlationId, recordingId, this);
        }
    }

    void onGetMaxRecordedPosition(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.getMaxRecordedPosition(correlationId, recordingId, this);
        }
    }

    void onArchiveId(long correlationId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.archiveId(correlationId, this);
        }
    }

    void onListRecordingSubscriptions(long correlationId, int pseudoIndex, int subscriptionCount, boolean applyStreamId, int streamId, String channelFragment) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.listRecordingSubscriptions(correlationId, pseudoIndex, subscriptionCount, applyStreamId, streamId, channelFragment, this);
        }
    }

    void onStopRecordingByIdentity(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.stopRecordingByIdentity(correlationId, recordingId, this);
        }
    }

    void onReplicate(long correlationId, long srcRecordingId, long dstRecordingId, long stopPosition, long channelTagId, long subscriptionTagId, int srcControlStreamId, int fileIoMaxLength, int replicationSessionId, String srcControlChannel, String liveDestination, String replicationChannel, byte[] encodedCredentials, String srcResponseChannel) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.replicate(correlationId, srcRecordingId, dstRecordingId, stopPosition, channelTagId, subscriptionTagId, srcControlStreamId, srcControlChannel, liveDestination, replicationChannel, fileIoMaxLength, replicationSessionId, encodedCredentials, srcResponseChannel, this);
        }
    }

    void onStopReplication(long correlationId, long replicationId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.stopReplication(correlationId, replicationId, this);
        }
    }

    void onGetStartPosition(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.getStartPosition(correlationId, recordingId, this);
        }
    }

    void onDetachSegments(long correlationId, long recordingId, long newStartPosition) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.detachSegments(correlationId, recordingId, newStartPosition, this);
        }
    }

    void onDeleteDetachedSegments(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.deleteDetachedSegments(correlationId, recordingId, this);
        }
    }

    void onPurgeSegments(long correlationId, long recordingId, long newStartPosition) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.purgeSegments(correlationId, recordingId, newStartPosition, this);
        }
    }

    void onAttachSegments(long correlationId, long recordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.attachSegments(correlationId, recordingId, this);
        }
    }

    void onMigrateSegments(long correlationId, long srcRecordingId, long dstRecordingId) {
        this.attemptToActivate();
        if (State.ACTIVE == this.state) {
            this.conductor.migrateSegments(correlationId, srcRecordingId, dstRecordingId, this);
        }
    }

    void sendOkResponse(long correlationId) {
        this.sendResponse(correlationId, 0L, ControlResponseCode.OK, null);
    }

    void sendOkResponse(long correlationId, long relevantId) {
        this.sendResponse(correlationId, relevantId, ControlResponseCode.OK, null);
    }

    void sendErrorResponse(long correlationId, String errorMessage) {
        this.sendResponse(correlationId, 0L, ControlResponseCode.ERROR, errorMessage);
    }

    void sendErrorResponse(long correlationId, long relevantId, String errorMessage) {
        this.sendResponse(correlationId, relevantId, ControlResponseCode.ERROR, errorMessage);
    }

    void sendRecordingUnknown(long correlationId, long recordingId) {
        this.sendResponse(correlationId, recordingId, ControlResponseCode.RECORDING_UNKNOWN, null);
    }

    void sendSubscriptionUnknown(long correlationId) {
        this.sendResponse(correlationId, 0L, ControlResponseCode.SUBSCRIPTION_UNKNOWN, null);
    }

    void sendResponse(long correlationId, long relevantId, ControlResponseCode code, String errorMessage) {
        this.assertCalledOnConductorThread();
        if (!this.syncResponseQueue.isEmpty() || !this.controlResponseProxy.sendResponse(this.controlSessionId, correlationId, relevantId, code, errorMessage, this)) {
            this.updateActivityDeadline(this.cachedEpochClock.time());
            this.queueResponse(correlationId, relevantId, code, errorMessage);
        } else {
            this.activityDeadlineMs = -1L;
        }
    }

    void asyncSendOkResponse(long correlationId, long replaySessionId) {
        if (!this.asyncResponseQueue.offer(() -> this.controlResponseProxy.sendResponse(this.controlSessionId, correlationId, replaySessionId, ControlResponseCode.OK, null, this))) {
            throw new IllegalStateException("failed to offer async replay response");
        }
    }

    boolean sendDescriptor(long correlationId, UnsafeBuffer descriptorBuffer) {
        this.assertCalledOnConductorThread();
        boolean sent = this.controlResponseProxy.sendDescriptor(this.controlSessionId, correlationId, descriptorBuffer, this);
        if (sent) {
            this.activityDeadlineMs = -1L;
        }
        return sent;
    }

    boolean sendSubscriptionDescriptor(long correlationId, Subscription subscription) {
        this.assertCalledOnConductorThread();
        boolean sent = this.controlResponseProxy.sendSubscriptionDescriptor(this.controlSessionId, correlationId, subscription, this);
        if (sent) {
            this.activityDeadlineMs = -1L;
        }
        return sent;
    }

    void sendSignal(long correlationId, long recordingId, long subscriptionId, long position, RecordingSignal recordingSignal) {
        this.assertCalledOnConductorThread();
        if (!this.syncResponseQueue.isEmpty() || !this.controlResponseProxy.sendSignal(this.controlSessionId, correlationId, recordingId, subscriptionId, position, recordingSignal, this.controlPublication)) {
            this.updateActivityDeadline(this.cachedEpochClock.time());
            this.syncResponseQueue.offer(() -> this.controlResponseProxy.sendSignal(this.controlSessionId, correlationId, recordingId, subscriptionId, position, recordingSignal, this.controlPublication));
        } else {
            this.activityDeadlineMs = -1L;
        }
    }

    int maxPayloadLength() {
        return this.controlPublication.maxPayloadLength();
    }

    void challenged() {
        this.state(State.CHALLENGED, "challenged");
    }

    void authenticate(byte[] encodedPrincipal) {
        this.encodedPrincipal = encodedPrincipal;
        this.activityDeadlineMs = -1L;
        this.state(State.AUTHENTICATED, "authenticated");
    }

    void reject() {
        this.state(State.REJECTED, SESSION_REJECTED_MSG);
    }

    private void assertCalledOnConductorThread() {
        if (Thread.currentThread() != this.conductorThread) {
            throw new IllegalStateException("Invalid concurrent access detected: " + String.valueOf(Thread.currentThread()) + " != " + String.valueOf(this.conductorThread));
        }
    }

    private void queueResponse(long correlationId, long relevantId, ControlResponseCode code, String message) {
        this.syncResponseQueue.offer(() -> this.controlResponseProxy.sendResponse(this.controlSessionId, correlationId, relevantId, code, message, this));
    }

    private int waitForPublication(long nowMs) {
        int workCount = 0;
        ExclusivePublication publication = this.aeron.getExclusivePublication(this.controlPublicationId);
        if (null != publication) {
            this.controlPublication = publication;
            this.activityDeadlineMs = nowMs + this.connectTimeoutMs;
            this.state(State.CONNECTING, "connecting");
            ++workCount;
        }
        return workCount;
    }

    private int waitForConnection(long nowMs) {
        int workCount = 0;
        if (this.controlPublication.isConnected()) {
            this.state(State.CONNECTED, "connected");
            ++workCount;
        }
        return workCount;
    }

    private int sendConnectResponse(long nowMs) {
        int workCount = 0;
        if (nowMs > this.resendDeadlineMs) {
            this.resendDeadlineMs = nowMs + 200L;
            if (null != this.invalidVersionMessage) {
                this.controlResponseProxy.sendResponse(this.controlSessionId, this.correlationId, this.controlSessionId, ControlResponseCode.ERROR, this.invalidVersionMessage, this);
            } else {
                this.authenticator.onConnectedSession(this.controlSessionProxy.controlSession(this), nowMs);
            }
            ++workCount;
        }
        return workCount;
    }

    private int waitForChallengeResponse(long nowMs) {
        this.authenticator.onChallengedSession(this.controlSessionProxy.controlSession(this), nowMs);
        return 1;
    }

    private int waitForRequest(long nowMs) {
        int workCount = 0;
        if (nowMs > this.resendDeadlineMs) {
            this.resendDeadlineMs = nowMs + 200L;
            if (this.controlResponseProxy.sendResponse(this.controlSessionId, this.correlationId, this.controlSessionId, ControlResponseCode.OK, null, this)) {
                this.activityDeadlineMs = -1L;
                ++workCount;
            }
        }
        return workCount;
    }

    private int sendResponses(long nowMs) {
        int workCount = 0;
        if (!this.controlPublication.isConnected()) {
            this.abortReason = "control publication not connected";
            this.state(State.INACTIVE, this.abortReason);
            ++workCount;
        } else {
            BooleanSupplier response;
            if (!this.syncResponseQueue.isEmpty() && this.syncResponseQueue.peekFirst().getAsBoolean()) {
                this.syncResponseQueue.pollFirst();
                this.activityDeadlineMs = -1L;
                ++workCount;
            }
            if (null != (response = this.asyncResponseQueue.peek())) {
                if (response.getAsBoolean()) {
                    this.asyncResponseQueue.poll();
                    this.activityDeadlineMs = -1L;
                    ++workCount;
                } else {
                    this.updateActivityDeadline(nowMs);
                }
            }
        }
        return workCount;
    }

    private int sendReject(long nowMs) {
        int workCount = 0;
        if (nowMs > this.resendDeadlineMs) {
            this.resendDeadlineMs = nowMs + 200L;
            this.controlResponseProxy.sendResponse(this.controlSessionId, this.correlationId, 10L, ControlResponseCode.ERROR, SESSION_REJECTED_MSG, this);
            ++workCount;
        }
        return workCount;
    }

    private boolean hasNoActivity(long nowMs) {
        return -1L != this.activityDeadlineMs && nowMs > this.activityDeadlineMs;
    }

    private void updateActivityDeadline(long nowMs) {
        if (-1L == this.activityDeadlineMs) {
            this.activityDeadlineMs = nowMs + this.connectTimeoutMs;
        }
    }

    private void attemptToActivate() {
        if (State.AUTHENTICATED == this.state && null == this.invalidVersionMessage) {
            this.state(State.ACTIVE, "active");
        }
    }

    private void state(State state, String reason) {
        this.logStateChange(this.state, state, this.controlSessionId, reason);
        this.state = state;
    }

    private void logStateChange(State oldState, State newState, long controlSessionId, String reason) {
    }

    public String toString() {
        return "ControlSession{controlSessionId=" + this.controlSessionId + ", correlationId=" + this.correlationId + ", state=" + String.valueOf((Object)this.state) + ", controlPublication=" + String.valueOf(this.controlPublication) + "}";
    }

    static enum State {
        INIT,
        CONNECTING,
        CONNECTED,
        CHALLENGED,
        AUTHENTICATED,
        ACTIVE,
        INACTIVE,
        REJECTED,
        DONE;

    }
}

