/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive.client;

import io.aeron.ChannelUri;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.client.ControlResponsePoller;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.exceptions.TimeoutException;
import io.aeron.logbuffer.FragmentHandler;
import java.util.concurrent.TimeUnit;
import org.agrona.concurrent.EpochClock;

public final class ReplayMerge
implements AutoCloseable {
    public static final int LIVE_ADD_MAX_WINDOW = 0x2000000;
    private static final int REPLAY_REMOVE_THRESHOLD = 0;
    private static final long MERGE_PROGRESS_TIMEOUT_DEFAULT_MS = TimeUnit.SECONDS.toMillis(10L);
    private final long recordingId;
    private final long startPosition;
    private final long mergeProgressTimeoutMs;
    private long replaySessionId = -1L;
    private long activeCorrelationId = -1L;
    private long nextTargetPosition = -1L;
    private long positionOfLastProgress = -1L;
    private long timeOfLastProgressMs;
    private boolean isLiveAdded = false;
    private boolean isReplayActive = false;
    private State state;
    private Image image;
    private final AeronArchive archive;
    private final Subscription subscription;
    private final EpochClock epochClock;
    private final String replayDestination;
    private final String liveDestination;
    private final String replayEndpoint;
    private final ChannelUri replayChannelUri;

    public ReplayMerge(Subscription subscription, AeronArchive archive, String replayChannel, String replayDestination, String liveDestination, long recordingId, long startPosition, EpochClock epochClock, long mergeProgressTimeoutMs) {
        if (subscription.channel().startsWith("aeron:ipc") || replayChannel.startsWith("aeron:ipc") || replayDestination.startsWith("aeron:ipc") || liveDestination.startsWith("aeron:ipc")) {
            throw new IllegalArgumentException("IPC merging is not supported");
        }
        if (!subscription.channel().contains("control-mode=manual")) {
            throw new IllegalArgumentException("Subscription URI must have 'control-mode=manual' uri=" + subscription.channel());
        }
        this.archive = archive;
        this.subscription = subscription;
        this.epochClock = epochClock;
        this.replayDestination = replayDestination;
        this.liveDestination = liveDestination;
        this.recordingId = recordingId;
        this.startPosition = startPosition;
        this.mergeProgressTimeoutMs = mergeProgressTimeoutMs;
        this.replayChannelUri = ChannelUri.parse(replayChannel);
        this.replayChannelUri.put("linger", "0");
        this.replayChannelUri.put("eos", "false");
        this.replayEndpoint = ChannelUri.parse(replayDestination).get("endpoint");
        if (this.replayEndpoint.endsWith(":0")) {
            this.state = State.RESOLVE_REPLAY_PORT;
        } else {
            this.replayChannelUri.put("endpoint", this.replayEndpoint);
            this.state = State.GET_RECORDING_POSITION;
        }
        subscription.asyncAddDestination(replayDestination);
        this.timeOfLastProgressMs = epochClock.time();
    }

    public ReplayMerge(Subscription subscription, AeronArchive archive, String replayChannel, String replayDestination, String liveDestination, long recordingId, long startPosition) {
        this(subscription, archive, replayChannel, replayDestination, liveDestination, recordingId, startPosition, archive.context().aeron().context().epochClock(), MERGE_PROGRESS_TIMEOUT_DEFAULT_MS);
    }

    @Override
    public void close() {
        State state = this.state;
        if (State.CLOSED != state) {
            if (!this.archive.context().aeron().isClosed()) {
                if (State.MERGED != state) {
                    this.subscription.asyncRemoveDestination(this.replayDestination);
                }
                if (this.isReplayActive && this.archive.archiveProxy().publication().isConnected()) {
                    this.stopReplay();
                }
            }
            this.state(State.CLOSED);
        }
    }

    public Subscription subscription() {
        return this.subscription;
    }

    public int doWork() {
        int workCount = 0;
        long nowMs = this.epochClock.time();
        try {
            switch (this.state) {
                case RESOLVE_REPLAY_PORT: {
                    workCount += this.resolveReplayPort(nowMs);
                    this.checkProgress(nowMs);
                    break;
                }
                case GET_RECORDING_POSITION: {
                    workCount += this.getRecordingPosition(nowMs);
                    this.checkProgress(nowMs);
                    break;
                }
                case REPLAY: {
                    workCount += this.replay(nowMs);
                    this.checkProgress(nowMs);
                    break;
                }
                case CATCHUP: {
                    workCount += this.catchup(nowMs);
                    this.checkProgress(nowMs);
                    break;
                }
                case ATTEMPT_LIVE_JOIN: {
                    workCount += this.attemptLiveJoin(nowMs);
                    this.checkProgress(nowMs);
                }
            }
        }
        catch (Exception ex) {
            this.state(State.FAILED);
            throw ex;
        }
        return workCount;
    }

    public int poll(FragmentHandler fragmentHandler, int fragmentLimit) {
        this.doWork();
        return null == this.image ? 0 : this.image.poll(fragmentHandler, fragmentLimit);
    }

    public boolean isMerged() {
        return this.state == State.MERGED;
    }

    public boolean hasFailed() {
        return this.state == State.FAILED;
    }

    public Image image() {
        return this.image;
    }

    public boolean isLiveAdded() {
        return this.isLiveAdded;
    }

    private int resolveReplayPort(long nowMs) {
        int workCount = 0;
        String resolvedEndpoint = this.subscription.resolvedEndpoint();
        if (null != resolvedEndpoint) {
            int i = resolvedEndpoint.lastIndexOf(58);
            this.replayChannelUri.put("endpoint", this.replayEndpoint.substring(0, this.replayEndpoint.length() - 2) + resolvedEndpoint.substring(i));
            this.timeOfLastProgressMs = nowMs;
            this.state(State.GET_RECORDING_POSITION);
            ++workCount;
        }
        return workCount;
    }

    private int getRecordingPosition(long nowMs) {
        int workCount = 0;
        if (-1L == this.activeCorrelationId) {
            long correlationId = this.archive.context().aeron().nextCorrelationId();
            if (this.archive.archiveProxy().getRecordingPosition(this.recordingId, correlationId, this.archive.controlSessionId())) {
                this.activeCorrelationId = correlationId;
                this.timeOfLastProgressMs = nowMs;
                ++workCount;
            }
        } else if (ReplayMerge.pollForResponse(this.archive, this.activeCorrelationId)) {
            this.nextTargetPosition = ReplayMerge.polledRelevantId(this.archive);
            this.activeCorrelationId = -1L;
            if (-1L == this.nextTargetPosition) {
                long correlationId = this.archive.context().aeron().nextCorrelationId();
                if (this.archive.archiveProxy().getStopPosition(this.recordingId, correlationId, this.archive.controlSessionId())) {
                    this.activeCorrelationId = correlationId;
                    this.timeOfLastProgressMs = nowMs;
                    ++workCount;
                }
            } else {
                this.timeOfLastProgressMs = nowMs;
                this.state(State.REPLAY);
            }
            ++workCount;
        }
        return workCount;
    }

    private int replay(long nowMs) {
        int workCount = 0;
        if (-1L == this.activeCorrelationId) {
            long correlationId = this.archive.context().aeron().nextCorrelationId();
            if (this.archive.archiveProxy().replay(this.recordingId, this.startPosition, Long.MAX_VALUE, this.replayChannelUri.toString(), this.subscription.streamId(), correlationId, this.archive.controlSessionId())) {
                this.activeCorrelationId = correlationId;
                this.timeOfLastProgressMs = nowMs;
                ++workCount;
            }
        } else if (ReplayMerge.pollForResponse(this.archive, this.activeCorrelationId)) {
            this.isReplayActive = true;
            this.replaySessionId = ReplayMerge.polledRelevantId(this.archive);
            this.timeOfLastProgressMs = nowMs;
            this.state(State.CATCHUP);
            ++workCount;
        }
        return workCount;
    }

    private int catchup(long nowMs) {
        int workCount = 0;
        if (null == this.image && this.subscription.isConnected()) {
            this.timeOfLastProgressMs = nowMs;
            this.image = this.subscription.imageBySessionId((int)this.replaySessionId);
            long l = this.positionOfLastProgress = null == this.image ? -1L : this.image.position();
        }
        if (null != this.image) {
            long position = this.image.position();
            if (position >= this.nextTargetPosition) {
                this.timeOfLastProgressMs = nowMs;
                this.positionOfLastProgress = position;
                this.state(State.ATTEMPT_LIVE_JOIN);
                ++workCount;
            } else if (position > this.positionOfLastProgress) {
                this.timeOfLastProgressMs = nowMs;
                this.positionOfLastProgress = position;
            } else if (this.image.isClosed()) {
                throw new IllegalStateException("ReplayMerge Image closed unexpectedly.");
            }
        }
        return workCount;
    }

    private int attemptLiveJoin(long nowMs) {
        int workCount = 0;
        if (-1L == this.activeCorrelationId) {
            long correlationId = this.archive.context().aeron().nextCorrelationId();
            if (this.archive.archiveProxy().getRecordingPosition(this.recordingId, correlationId, this.archive.controlSessionId())) {
                this.activeCorrelationId = correlationId;
                ++workCount;
            }
        } else if (ReplayMerge.pollForResponse(this.archive, this.activeCorrelationId)) {
            this.nextTargetPosition = ReplayMerge.polledRelevantId(this.archive);
            this.activeCorrelationId = -1L;
            if (-1L == this.nextTargetPosition) {
                long correlationId = this.archive.context().aeron().nextCorrelationId();
                if (this.archive.archiveProxy().getRecordingPosition(this.recordingId, correlationId, this.archive.controlSessionId())) {
                    this.activeCorrelationId = correlationId;
                }
            } else {
                State nextState = State.CATCHUP;
                if (null != this.image) {
                    long position = this.image.position();
                    if (this.shouldAddLiveDestination(position)) {
                        this.subscription.asyncAddDestination(this.liveDestination);
                        this.timeOfLastProgressMs = nowMs;
                        this.positionOfLastProgress = position;
                        this.isLiveAdded = true;
                    } else if (this.shouldStopAndRemoveReplay(position)) {
                        this.subscription.asyncRemoveDestination(this.replayDestination);
                        this.stopReplay();
                        this.timeOfLastProgressMs = nowMs;
                        this.positionOfLastProgress = position;
                        nextState = State.MERGED;
                    }
                }
                this.state(nextState);
            }
            ++workCount;
        }
        return workCount;
    }

    private void stopReplay() {
        long correlationId = this.archive.context().aeron().nextCorrelationId();
        if (this.archive.archiveProxy().stopReplay(this.replaySessionId, correlationId, this.archive.controlSessionId())) {
            this.isReplayActive = false;
        }
    }

    private void state(State newState) {
        this.state = newState;
        this.activeCorrelationId = -1L;
    }

    private boolean shouldAddLiveDestination(long position) {
        return !this.isLiveAdded && this.nextTargetPosition - position <= (long)Math.min(this.image.termBufferLength() >> 2, 0x2000000);
    }

    private boolean shouldStopAndRemoveReplay(long position) {
        return this.isLiveAdded && this.nextTargetPosition - position <= 0L && this.image.activeTransportCount() >= 2;
    }

    private boolean hasProgressStalled(long nowMs) {
        return nowMs > this.timeOfLastProgressMs + this.mergeProgressTimeoutMs;
    }

    private void checkProgress(long nowMs) {
        if (this.hasProgressStalled(nowMs)) {
            throw new TimeoutException("ReplayMerge no progress: state=" + (Object)((Object)this.state));
        }
    }

    private static boolean pollForResponse(AeronArchive archive, long correlationId) {
        ControlResponsePoller poller = archive.controlResponsePoller();
        if (poller.poll() > 0 && poller.isPollComplete() && poller.controlSessionId() == archive.controlSessionId()) {
            if (poller.code() == ControlResponseCode.ERROR) {
                throw new ArchiveException("archive response for correlationId=" + poller.correlationId() + ", error: " + poller.errorMessage(), (int)poller.relevantId(), poller.correlationId());
            }
            return poller.correlationId() == correlationId;
        }
        return false;
    }

    private static long polledRelevantId(AeronArchive archive) {
        return archive.controlResponsePoller().relevantId();
    }

    public String toString() {
        return "ReplayMerge{state=" + (Object)((Object)this.state) + ", nextTargetPosition=" + this.nextTargetPosition + ", timeOfLastProgressMs=" + this.timeOfLastProgressMs + ", positionOfLastProgress=" + this.positionOfLastProgress + ", isLiveAdded=" + this.isLiveAdded + ", isReplayActive=" + this.isReplayActive + ", replayChannelUri=" + this.replayChannelUri + '}';
    }

    static enum State {
        RESOLVE_REPLAY_PORT,
        GET_RECORDING_POSITION,
        REPLAY,
        CATCHUP,
        ATTEMPT_LIVE_JOIN,
        MERGED,
        FAILED,
        CLOSED;

    }
}

