/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.ChannelUri;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ControlResponsePoller;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.cluster.ClusterMember;
import io.aeron.cluster.ConsensusModule;
import io.aeron.cluster.ConsensusModuleAgent;
import io.aeron.cluster.ConsensusPublisher;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.SnapshotRetrieveMonitor;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.SnapshotRecordingsDecoder;
import java.util.ArrayList;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.collections.LongArrayList;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.NoOpLock;

class DynamicJoin
implements AutoCloseable {
    private final AeronArchive localArchive;
    private final ConsensusPublisher consensusPublisher;
    private final ConsensusModule.Context ctx;
    private final ConsensusModuleAgent consensusModuleAgent;
    private final String[] clusterConsensusEndpoints;
    private final String consensusEndpoints;
    private final String consensusEndpoint;
    private final String catchupEndpoint;
    private final String archiveEndpoint;
    private final ArrayList<RecordingLog.Snapshot> leaderSnapshots = new ArrayList();
    private final LongArrayList snapshotLengths = new LongArrayList();
    private final long intervalNs;
    private ExclusivePublication consensusPublication;
    private State state = State.INIT;
    private ClusterMember[] clusterMembers;
    private ClusterMember leaderMember;
    private AeronArchive.AsyncConnect leaderArchiveAsyncConnect;
    private AeronArchive leaderArchive;
    private SnapshotRetrieveMonitor snapshotRetrieveMonitor;
    private Counter recoveryStateCounter;
    private long timeOfLastActivityNs = 0L;
    private long correlationId = -1L;
    private int memberId = -1;
    private int clusterConsensusEndpointsCursor = -1;
    private int snapshotCursor = 0;

    DynamicJoin(String consensusEndpoints, AeronArchive localArchive, ConsensusPublisher consensusPublisher, ConsensusModule.Context ctx, ConsensusModuleAgent consensusModuleAgent) {
        ClusterMember thisMember = ClusterMember.parseEndpoints(-1, ctx.memberEndpoints());
        this.localArchive = localArchive;
        this.consensusPublisher = consensusPublisher;
        this.ctx = ctx;
        this.consensusModuleAgent = consensusModuleAgent;
        this.intervalNs = ctx.dynamicJoinIntervalNs();
        this.consensusEndpoints = ctx.memberEndpoints();
        this.consensusEndpoint = thisMember.consensusEndpoint();
        this.catchupEndpoint = thisMember.catchupEndpoint();
        this.archiveEndpoint = thisMember.archiveEndpoint();
        this.clusterConsensusEndpoints = consensusEndpoints.split(",");
    }

    @Override
    public void close() {
        CountedErrorHandler countedErrorHandler = this.ctx.countedErrorHandler();
        CloseHelper.closeAll((ErrorHandler)countedErrorHandler, this.consensusPublication, this.leaderArchive, this.leaderArchiveAsyncConnect);
    }

    ClusterMember[] clusterMembers() {
        return this.clusterMembers;
    }

    ClusterMember leader() {
        return this.leaderMember;
    }

    int memberId() {
        return this.memberId;
    }

    int doWork(long nowNs) {
        int workCount = 0;
        switch (this.state) {
            case INIT: {
                workCount += this.init(nowNs);
                break;
            }
            case PASSIVE_FOLLOWER: {
                workCount += this.passiveFollower(nowNs);
                break;
            }
            case SNAPSHOT_LENGTH_RETRIEVE: {
                workCount += this.snapshotLengthRetrieve();
                break;
            }
            case SNAPSHOT_RETRIEVE: {
                workCount += this.snapshotRetrieve();
                break;
            }
            case SNAPSHOT_LOAD: {
                workCount += this.snapshotLoad(nowNs);
                break;
            }
            case JOIN_CLUSTER: {
                workCount += this.joinCluster();
            }
        }
        return workCount;
    }

    void onClusterMembersChange(long correlationId, int leaderMemberId, String activeMembers, String passiveMembers) {
        if (State.INIT == this.state && correlationId == this.correlationId) {
            ClusterMember[] passiveFollowers;
            for (ClusterMember follower : passiveFollowers = ClusterMember.parse(passiveMembers)) {
                if (!this.consensusEndpoint.equals(follower.consensusEndpoint())) continue;
                this.memberId = follower.id();
                this.clusterMembers = ClusterMember.parse(activeMembers);
                this.leaderMember = ClusterMember.findMember(this.clusterMembers, leaderMemberId);
                if (null == this.leaderMember) break;
                if (!this.leaderMember.consensusEndpoint().equals(this.clusterConsensusEndpoints[this.clusterConsensusEndpointsCursor])) {
                    CloseHelper.close(this.ctx.countedErrorHandler(), this.consensusPublication);
                    ChannelUri consensusUri = ChannelUri.parse(this.ctx.consensusChannel());
                    consensusUri.put("endpoint", this.leaderMember.consensusEndpoint());
                    this.consensusPublication = this.ctx.aeron().addExclusivePublication(consensusUri.toString(), this.ctx.consensusStreamId());
                }
                this.timeOfLastActivityNs = 0L;
                this.state(State.PASSIVE_FOLLOWER);
                break;
            }
        }
    }

    void onSnapshotRecordings(long correlationId, SnapshotRecordingsDecoder snapshotRecordingsDecoder) {
        if (State.PASSIVE_FOLLOWER == this.state && correlationId == this.correlationId) {
            SnapshotRecordingsDecoder.SnapshotsDecoder snapshotsDecoder = snapshotRecordingsDecoder.snapshots();
            if (snapshotsDecoder.count() > 0) {
                for (SnapshotRecordingsDecoder.SnapshotsDecoder snapshot : snapshotsDecoder) {
                    if (snapshot.serviceId() > this.ctx.serviceCount()) continue;
                    this.leaderSnapshots.add(new RecordingLog.Snapshot(snapshot.recordingId(), snapshot.leadershipTermId(), snapshot.termBaseLogPosition(), snapshot.logPosition(), snapshot.timestamp(), snapshot.serviceId()));
                }
            }
            this.timeOfLastActivityNs = 0L;
            this.snapshotCursor = 0;
            this.correlationId = -1L;
            if (this.leaderSnapshots.isEmpty()) {
                this.state(State.SNAPSHOT_LOAD);
            } else {
                AeronArchive.Context leaderArchiveCtx = new AeronArchive.Context().aeron(this.ctx.aeron()).lock(NoOpLock.INSTANCE).controlRequestChannel("aeron:udp?term-length=64k|endpoint=" + this.leaderMember.archiveEndpoint()).controlRequestStreamId(this.ctx.archiveContext().controlRequestStreamId()).controlResponseChannel("aeron:udp?endpoint=" + this.archiveEndpoint).controlResponseStreamId(this.ctx.archiveContext().controlResponseStreamId());
                this.leaderArchiveAsyncConnect = AeronArchive.asyncConnect(leaderArchiveCtx);
                this.state(State.SNAPSHOT_LENGTH_RETRIEVE);
            }
        }
    }

    private int init(long nowNs) {
        long correlationId;
        if (nowNs > this.timeOfLastActivityNs + this.intervalNs) {
            int cursor;
            if ((cursor = ++this.clusterConsensusEndpointsCursor) >= this.clusterConsensusEndpoints.length) {
                this.clusterConsensusEndpointsCursor = 0;
                cursor = 0;
            }
            CloseHelper.close(this.ctx.countedErrorHandler(), this.consensusPublication);
            ChannelUri uri = ChannelUri.parse(this.ctx.consensusChannel());
            uri.put("endpoint", this.clusterConsensusEndpoints[cursor]);
            this.consensusPublication = this.ctx.aeron().addExclusivePublication(uri.toString(), this.ctx.consensusStreamId());
            this.correlationId = -1L;
            this.timeOfLastActivityNs = nowNs;
            return 1;
        }
        if (-1L == this.correlationId && this.consensusPublication.isConnected() && this.consensusPublisher.addPassiveMember(this.consensusPublication, correlationId = this.ctx.aeron().nextCorrelationId(), this.consensusEndpoints)) {
            this.timeOfLastActivityNs = nowNs;
            this.correlationId = correlationId;
            return 1;
        }
        return 0;
    }

    private int passiveFollower(long nowNs) {
        if (nowNs > this.timeOfLastActivityNs + this.intervalNs) {
            this.correlationId = this.ctx.aeron().nextCorrelationId();
            if (this.consensusPublisher.snapshotRecordingQuery(this.consensusPublication, this.correlationId, this.memberId)) {
                this.timeOfLastActivityNs = nowNs;
                return 1;
            }
        }
        return 0;
    }

    private int snapshotLengthRetrieve() {
        int workCount = 0;
        if (null == this.leaderArchive) {
            this.leaderArchive = this.leaderArchiveAsyncConnect.poll();
            return null == this.leaderArchive ? 0 : 1;
        }
        if (-1L == this.correlationId) {
            long stopPositionCorrelationId = this.ctx.aeron().nextCorrelationId();
            RecordingLog.Snapshot snapshot = this.leaderSnapshots.get(this.snapshotCursor);
            if (this.leaderArchive.archiveProxy().getStopPosition(snapshot.recordingId, stopPositionCorrelationId, this.leaderArchive.controlSessionId())) {
                this.correlationId = stopPositionCorrelationId;
                ++workCount;
            }
        } else if (DynamicJoin.pollForResponse(this.leaderArchive, this.correlationId)) {
            this.correlationId = -1L;
            long snapshotStopPosition = this.leaderArchive.controlResponsePoller().relevantId();
            if (-1L == snapshotStopPosition) {
                throw new ClusterException("snapshot stopPosition is NULL_POSITION");
            }
            this.snapshotLengths.addLong(this.snapshotCursor, snapshotStopPosition);
            if (++this.snapshotCursor >= this.leaderSnapshots.size()) {
                this.snapshotCursor = 0;
                this.state(State.SNAPSHOT_RETRIEVE);
            }
            ++workCount;
        }
        return workCount;
    }

    private int snapshotRetrieve() {
        int workCount = 0;
        if (null == this.leaderArchive) {
            this.leaderArchive = this.leaderArchiveAsyncConnect.poll();
            return null == this.leaderArchive ? 0 : 1;
        }
        if (null != this.snapshotRetrieveMonitor) {
            workCount += this.snapshotRetrieveMonitor.poll();
            if (this.snapshotRetrieveMonitor.isDone()) {
                this.consensusModuleAgent.retrievedSnapshot(this.snapshotRetrieveMonitor.recordingId(), this.leaderSnapshots.get(this.snapshotCursor));
                this.snapshotRetrieveMonitor = null;
                this.correlationId = -1L;
                if (++this.snapshotCursor >= this.leaderSnapshots.size()) {
                    this.state(State.SNAPSHOT_LOAD);
                    ++workCount;
                }
            }
        } else if (-1L == this.correlationId) {
            long replayId = this.ctx.aeron().nextCorrelationId();
            RecordingLog.Snapshot snapshot = this.leaderSnapshots.get(this.snapshotCursor);
            String catchupChannel = "aeron:udp?endpoint=" + this.catchupEndpoint;
            if (this.leaderArchive.archiveProxy().replay(snapshot.recordingId, 0L, -1L, catchupChannel, this.ctx.replayStreamId(), replayId, this.leaderArchive.controlSessionId())) {
                this.correlationId = replayId;
                ++workCount;
            }
        } else if (DynamicJoin.pollForResponse(this.leaderArchive, this.correlationId)) {
            int replaySessionId = (int)this.leaderArchive.controlResponsePoller().relevantId();
            String catchupChannel = "aeron:udp?endpoint=" + this.catchupEndpoint + "|session-id=" + replaySessionId;
            this.snapshotRetrieveMonitor = new SnapshotRetrieveMonitor(this.localArchive, this.snapshotLengths.get(this.snapshotCursor));
            this.localArchive.archiveProxy().startRecording(catchupChannel, this.ctx.replayStreamId(), SourceLocation.REMOTE, true, this.localArchive.context().aeron().nextCorrelationId(), this.localArchive.controlSessionId());
            ++workCount;
        }
        return workCount;
    }

    private int snapshotLoad(long nowNs) {
        int workCount = 0;
        if (null == this.recoveryStateCounter) {
            this.recoveryStateCounter = this.consensusModuleAgent.loadSnapshotsForDynamicJoin();
            ++workCount;
        } else if (this.consensusModuleAgent.pollForSnapshotLoadAck(this.recoveryStateCounter, nowNs)) {
            CloseHelper.close(this.ctx.countedErrorHandler(), this.recoveryStateCounter);
            this.recoveryStateCounter = null;
            this.state(State.JOIN_CLUSTER);
            ++workCount;
        }
        return workCount;
    }

    private int joinCluster() {
        long leadershipTermId;
        int workCount = 0;
        long l = leadershipTermId = this.leaderSnapshots.isEmpty() ? -1L : this.leaderSnapshots.get((int)0).leadershipTermId;
        if (this.consensusPublisher.joinCluster(this.consensusPublication, leadershipTermId, this.memberId) && this.consensusModuleAgent.dynamicJoinComplete()) {
            this.state(State.DONE);
            this.close();
            ++workCount;
        }
        return workCount;
    }

    private void state(State newState) {
        this.state = newState;
    }

    private static boolean pollForResponse(AeronArchive archive, long correlationId) {
        ControlResponsePoller poller = archive.controlResponsePoller();
        if (poller.poll() > 0 && poller.isPollComplete() && poller.controlSessionId() == archive.controlSessionId() && poller.correlationId() == correlationId) {
            if (poller.code() == ControlResponseCode.ERROR) {
                throw new ClusterException("archive response for correlationId=" + correlationId + ", error: " + poller.errorMessage());
            }
            return true;
        }
        return false;
    }

    static enum State {
        INIT,
        PASSIVE_FOLLOWER,
        SNAPSHOT_LENGTH_RETRIEVE,
        SNAPSHOT_RETRIEVE,
        SNAPSHOT_LOAD,
        JOIN_CLUSTER,
        DONE;

    }
}

