/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.archive.ArchiveConductor;
import io.aeron.archive.Catalog;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSession;
import io.aeron.archive.RecordingSummary;
import io.aeron.archive.Session;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.client.ControlResponsePoller;
import io.aeron.archive.client.RecordingDescriptorConsumer;
import io.aeron.archive.client.RecordingDescriptorPoller;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.RecordingSignal;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.exceptions.TimeoutException;
import java.util.concurrent.TimeUnit;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.CountedErrorHandler;

class ReplicationSession
implements Session,
RecordingDescriptorConsumer {
    private static final int REPLAY_REMOVE_THRESHOLD = 0;
    private static final int RETRY_ATTEMPTS = 3;
    private long activeCorrelationId = -1L;
    private long srcReplaySessionId = -1L;
    private long replayPosition = -1L;
    private long srcStopPosition = -1L;
    private long srcRecordingPosition = -1L;
    private long timeOfLastActionMs;
    private final long actionTimeoutMs;
    private final long replicationId;
    private final long channelTagId;
    private final long subscriptionTagId;
    private final long srcRecordingId;
    private long dstRecordingId;
    private int replayStreamId;
    private int replaySessionId;
    private int retryAttempts = 3;
    private boolean isLiveAdded;
    private final boolean isTagged;
    private final String replicationChannel;
    private final String liveDestination;
    private String replayDestination;
    private final CachedEpochClock epochClock;
    private final ArchiveConductor conductor;
    private final ControlSession controlSession;
    private final ControlResponseProxy controlResponseProxy;
    private final Catalog catalog;
    private final Aeron aeron;
    private final AeronArchive.Context context;
    private AeronArchive.AsyncConnect asyncConnect;
    private AeronArchive srcArchive;
    private Subscription recordingSubscription;
    private Image image;
    private State state = State.CONNECT;

    ReplicationSession(long srcRecordingId, long dstRecordingId, long channelTagId, long subscriptionTagId, long replicationId, String liveDestination, String replicationChannel, RecordingSummary recordingSummary, AeronArchive.Context context, CachedEpochClock epochClock, Catalog catalog, ControlResponseProxy controlResponseProxy, ControlSession controlSession) {
        this.replicationId = replicationId;
        this.srcRecordingId = srcRecordingId;
        this.dstRecordingId = dstRecordingId;
        this.liveDestination = "".equals(liveDestination) ? null : liveDestination;
        this.replicationChannel = replicationChannel;
        this.aeron = context.aeron();
        this.context = context;
        this.catalog = catalog;
        this.controlResponseProxy = controlResponseProxy;
        this.epochClock = epochClock;
        this.conductor = controlSession.archiveConductor();
        this.controlSession = controlSession;
        this.actionTimeoutMs = TimeUnit.NANOSECONDS.toMillis(context.messageTimeoutNs());
        this.isTagged = -1L != channelTagId || -1L != subscriptionTagId;
        this.channelTagId = -1L == channelTagId ? replicationId : channelTagId;
        long l = this.subscriptionTagId = -1L == subscriptionTagId ? replicationId : subscriptionTagId;
        if (null != recordingSummary) {
            this.replayPosition = recordingSummary.stopPosition;
            this.replayStreamId = recordingSummary.streamId;
        }
    }

    @Override
    public long sessionId() {
        return this.replicationId;
    }

    @Override
    public boolean isDone() {
        return this.state == State.DONE;
    }

    @Override
    public void abort() {
        this.state(State.DONE);
    }

    @Override
    public void close() {
        ArchiveConductor archiveConductor = this.controlSession.archiveConductor();
        CountedErrorHandler countedErrorHandler = archiveConductor.context().countedErrorHandler();
        this.stopRecording(countedErrorHandler);
        this.stopReplaySession(countedErrorHandler);
        CloseHelper.close((ErrorHandler)countedErrorHandler, (AutoCloseable)this.asyncConnect);
        CloseHelper.close((ErrorHandler)countedErrorHandler, (AutoCloseable)this.srcArchive);
        archiveConductor.removeReplicationSession(this);
    }

    @Override
    public int doWork() {
        int workCount = 0;
        try {
            if (null != this.recordingSubscription && this.recordingSubscription.isClosed()) {
                this.state(State.DONE);
                return 1;
            }
            switch (this.state) {
                case CONNECT: {
                    workCount += this.connect();
                    break;
                }
                case REPLICATE_DESCRIPTOR: {
                    workCount += this.replicateDescriptor();
                    break;
                }
                case SRC_RECORDING_POSITION: {
                    workCount += this.srcRecordingPosition();
                    break;
                }
                case EXTEND: {
                    workCount += this.extend();
                    break;
                }
                case REPLAY: {
                    workCount += this.replay();
                    break;
                }
                case AWAIT_IMAGE: {
                    workCount += this.awaitImage();
                    break;
                }
                case REPLICATE: {
                    workCount += this.replicate();
                    break;
                }
                case CATCHUP: {
                    workCount += this.catchup();
                    break;
                }
                case ATTEMPT_LIVE_JOIN: {
                    workCount += this.attemptLiveJoin();
                }
            }
        }
        catch (Throwable ex) {
            this.state(State.DONE);
            this.error(ex.getMessage(), 0);
            throw ex;
        }
        return workCount;
    }

    @Override
    public void onRecordingDescriptor(long controlSessionId, long correlationId, long recordingId, long startTimestamp, long stopTimestamp, long startPosition, long stopPosition, int initialTermId, int segmentFileLength, int termBufferLength, int mtuLength, int sessionId, int streamId, String strippedChannel, String originalChannel, String sourceIdentity) {
        this.srcStopPosition = stopPosition;
        this.replayStreamId = streamId;
        this.replaySessionId = sessionId;
        if (-1L == this.dstRecordingId) {
            this.replayPosition = startPosition;
            this.dstRecordingId = this.catalog.addNewRecording(startPosition, startPosition, startTimestamp, startTimestamp, initialTermId, segmentFileLength, termBufferLength, mtuLength, sessionId, streamId, strippedChannel, originalChannel, sourceIdentity);
            this.signal(startPosition, RecordingSignal.REPLICATE);
        }
        State nextState = State.EXTEND;
        if (null != this.liveDestination) {
            if (-1L != stopPosition) {
                this.state(State.DONE);
                this.error("cannot live merge without active source recording", 0);
                return;
            }
            nextState = State.SRC_RECORDING_POSITION;
        }
        if (startPosition == stopPosition) {
            this.signal(stopPosition, RecordingSignal.SYNC);
            nextState = State.DONE;
        }
        this.state(nextState);
    }

    private int connect() {
        int workCount = 0;
        if (null == this.asyncConnect) {
            this.asyncConnect = AeronArchive.asyncConnect(this.context);
            ++workCount;
        } else {
            int step = this.asyncConnect.step();
            AeronArchive archive = this.asyncConnect.poll();
            if (null == archive) {
                if (this.asyncConnect.step() != step) {
                    ++workCount;
                }
            } else {
                this.srcArchive = archive;
                this.asyncConnect = null;
                this.state(State.REPLICATE_DESCRIPTOR);
                ++workCount;
            }
        }
        return workCount;
    }

    private int replicateDescriptor() {
        int workCount = 0;
        if (-1L == this.activeCorrelationId) {
            long correlationId = this.aeron.nextCorrelationId();
            if (this.srcArchive.archiveProxy().listRecording(this.srcRecordingId, correlationId, this.srcArchive.controlSessionId())) {
                workCount += this.trackAction(correlationId);
                this.srcArchive.recordingDescriptorPoller().reset(correlationId, 1, this);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to list remote recording descriptor");
            }
        } else {
            RecordingDescriptorPoller poller = this.srcArchive.recordingDescriptorPoller();
            int fragments = poller.poll();
            if (poller.isDispatchComplete() && poller.remainingRecordCount() > 0) {
                this.error("unknown src recording id " + this.srcRecordingId, 5);
                this.state(State.DONE);
            }
            if (0 == fragments && this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to fetch remote recording descriptor");
            }
            workCount += fragments;
        }
        return workCount;
    }

    private int srcRecordingPosition() {
        int workCount = 0;
        if (-1L == this.activeCorrelationId) {
            long correlationId = this.aeron.nextCorrelationId();
            if (this.srcArchive.archiveProxy().getRecordingPosition(this.srcRecordingId, correlationId, this.srcArchive.controlSessionId())) {
                workCount += this.trackAction(correlationId);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to send recording position request");
            }
        } else {
            ControlResponsePoller poller = this.srcArchive.controlResponsePoller();
            workCount += poller.poll();
            if (this.hasResponse(poller)) {
                this.srcRecordingPosition = poller.relevantId();
                if (-1L == this.srcRecordingPosition && null != this.liveDestination) {
                    throw new ArchiveException("cannot live merge without active source recording");
                }
                this.state(State.EXTEND);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to get recording position");
            }
        }
        return workCount;
    }

    private int extend() {
        boolean isMds = this.isTagged || null != this.liveDestination;
        ChannelUri channelUri = ChannelUri.parse((CharSequence)this.replicationChannel);
        String endpoint = channelUri.get("endpoint");
        ChannelUriStringBuilder builder = new ChannelUriStringBuilder().media(channelUri).sessionId(Integer.valueOf(this.replaySessionId)).alias(channelUri).rejoin(Boolean.FALSE);
        if (isMds) {
            builder.tags(this.channelTagId + "," + this.subscriptionTagId).controlMode("manual");
        } else {
            builder.endpoint(endpoint);
        }
        String channel = builder.build();
        this.recordingSubscription = this.conductor.extendRecording(this.replicationId, this.dstRecordingId, this.replayStreamId, SourceLocation.REMOTE, true, channel, this.controlSession);
        if (null == this.recordingSubscription) {
            this.state(State.DONE);
        } else {
            if (isMds) {
                this.replayDestination = "aeron:udp?endpoint=" + endpoint;
                this.recordingSubscription.asyncAddDestination(this.replayDestination);
            }
            this.state(State.REPLAY);
        }
        return 1;
    }

    private int replay() {
        int workCount = 0;
        if (-1L == this.activeCorrelationId) {
            String resolvedEndpoint = this.recordingSubscription.resolvedEndpoint();
            if (null == resolvedEndpoint) {
                return workCount;
            }
            ChannelUri channelUri = ChannelUri.parse((CharSequence)this.replicationChannel);
            channelUri.put("session-id", Integer.toString(this.replaySessionId));
            String endpoint = channelUri.get("endpoint");
            if (null != endpoint && endpoint.endsWith(":0")) {
                int i = resolvedEndpoint.lastIndexOf(58);
                channelUri.put("endpoint", endpoint.substring(0, endpoint.length() - 2) + resolvedEndpoint.substring(i));
            }
            if (null != this.liveDestination) {
                channelUri.put("linger", "0");
                channelUri.put("eos", "false");
            }
            long correlationId = this.aeron.nextCorrelationId();
            if (this.srcArchive.archiveProxy().replay(this.srcRecordingId, this.replayPosition, Long.MAX_VALUE, channelUri.toString(), this.replayStreamId, correlationId, this.srcArchive.controlSessionId())) {
                workCount += this.trackAction(correlationId);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to send replay request");
            }
        } else {
            ControlResponsePoller poller = this.srcArchive.controlResponsePoller();
            workCount += poller.poll();
            if (this.hasResponse(poller)) {
                this.srcReplaySessionId = poller.relevantId();
                this.state(State.AWAIT_IMAGE);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed get acknowledgement of replay request to: " + this.replicationChannel);
            }
        }
        return workCount;
    }

    private int awaitImage() {
        int workCount = 0;
        this.image = this.recordingSubscription.imageBySessionId(this.replaySessionId);
        if (null != this.image) {
            this.state(null == this.liveDestination ? State.REPLICATE : State.CATCHUP);
            ++workCount;
        } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
            throw new TimeoutException("failed get replay image for sessionId " + this.replaySessionId + " on channel " + this.recordingSubscription.channel());
        }
        return workCount;
    }

    private int replicate() {
        int workCount = 0;
        long position = this.image.position();
        if (this.image.isEndOfStream() || this.image.isClosed() || -1L != this.srcStopPosition && position >= this.srcStopPosition) {
            if (-1L != this.srcStopPosition && position >= this.srcStopPosition || -1L == this.srcStopPosition && this.image.isEndOfStream()) {
                this.srcReplaySessionId = -1L;
                this.signal(position, RecordingSignal.SYNC);
            }
            this.state(State.DONE);
            ++workCount;
        }
        return workCount;
    }

    private int catchup() {
        int workCount = 0;
        if (this.image.position() >= this.srcRecordingPosition) {
            this.state(State.ATTEMPT_LIVE_JOIN);
            ++workCount;
        } else if (this.image.isClosed()) {
            throw new ArchiveException("replication image closed unexpectedly");
        }
        return workCount;
    }

    private int attemptLiveJoin() {
        int workCount = 0;
        if (-1L == this.activeCorrelationId) {
            long correlationId = this.aeron.nextCorrelationId();
            if (this.srcArchive.archiveProxy().getRecordingPosition(this.srcRecordingId, correlationId, this.srcArchive.controlSessionId())) {
                workCount += this.trackAction(correlationId);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to send recording position request");
            }
        } else {
            ControlResponsePoller poller = this.srcArchive.controlResponsePoller();
            workCount += poller.poll();
            if (this.hasResponse(poller)) {
                this.trackAction(-1L);
                this.retryAttempts = 3;
                this.srcRecordingPosition = poller.relevantId();
                if (-1L == this.srcRecordingPosition && null != this.liveDestination) {
                    throw new ArchiveException("cannot live merge without active source recording");
                }
                long position = this.image.position();
                if (this.shouldAddLiveDestination(position)) {
                    this.recordingSubscription.asyncAddDestination(this.liveDestination);
                    this.isLiveAdded = true;
                } else if (this.shouldStopReplay(position)) {
                    this.recordingSubscription.asyncRemoveDestination(this.replayDestination);
                    this.replayDestination = null;
                    this.recordingSubscription = null;
                    this.signal(position, RecordingSignal.MERGE);
                    this.state(State.DONE);
                }
                ++workCount;
            } else {
                if (this.image.isClosed()) {
                    throw new ArchiveException("replication image closed unexpectedly");
                }
                if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                    if (--this.retryAttempts == 0) {
                        throw new TimeoutException("failed to get recording position");
                    }
                    this.trackAction(-1L);
                }
            }
        }
        return workCount;
    }

    private boolean hasResponse(ControlResponsePoller poller) {
        if (poller.isPollComplete() && poller.controlSessionId() == this.srcArchive.controlSessionId()) {
            ControlResponseCode code = poller.code();
            if (ControlResponseCode.ERROR == code) {
                throw new ArchiveException(poller.errorMessage(), code.value());
            }
            return poller.correlationId() == this.activeCorrelationId && ControlResponseCode.OK == code;
        }
        return false;
    }

    private void error(String msg, int errorCode) {
        if (this.controlSession.controlPublication().isConnected()) {
            this.controlSession.sendErrorResponse(this.replicationId, errorCode, msg, this.controlResponseProxy);
        }
    }

    private void signal(long position, RecordingSignal recordingSignal) {
        long subscriptionId = null != this.recordingSubscription ? this.recordingSubscription.registrationId() : -1L;
        this.controlSession.attemptSignal(this.replicationId, this.dstRecordingId, subscriptionId, position, recordingSignal);
    }

    private void stopReplaySession(CountedErrorHandler countedErrorHandler) {
        if (-1L != this.srcReplaySessionId) {
            try {
                this.srcArchive.archiveProxy().stopReplay(this.srcReplaySessionId, this.aeron.nextCorrelationId(), this.srcArchive.controlSessionId());
            }
            catch (Exception ex) {
                countedErrorHandler.onError((Throwable)ex);
            }
            this.srcReplaySessionId = -1L;
        }
    }

    private void stopRecording(CountedErrorHandler countedErrorHandler) {
        if (null != this.recordingSubscription) {
            this.conductor.removeRecordingSubscription(this.recordingSubscription.registrationId());
            CloseHelper.close((ErrorHandler)countedErrorHandler, (AutoCloseable)this.recordingSubscription);
            this.recordingSubscription = null;
        }
    }

    private boolean shouldAddLiveDestination(long position) {
        return !this.isLiveAdded && this.srcRecordingPosition - position <= (long)Math.min(this.image.termBufferLength() >> 2, 0x2000000);
    }

    private boolean shouldStopReplay(long position) {
        return this.isLiveAdded && this.srcRecordingPosition - position <= 0L && this.image.activeTransportCount() >= 2;
    }

    private int trackAction(long correlationId) {
        this.timeOfLastActionMs = this.epochClock.time();
        this.activeCorrelationId = correlationId;
        return 1;
    }

    private void state(State newState) {
        this.timeOfLastActionMs = this.epochClock.time();
        this.stateChange(this.state, newState, this.replicationId);
        this.state = newState;
        this.activeCorrelationId = -1L;
    }

    void stateChange(State oldState, State newState, long replicationId) {
    }

    static enum State {
        CONNECT,
        REPLICATE_DESCRIPTOR,
        SRC_RECORDING_POSITION,
        EXTEND,
        REPLAY,
        AWAIT_IMAGE,
        REPLICATE,
        CATCHUP,
        ATTEMPT_LIVE_JOIN,
        DONE;

    }
}

