/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.RecordingSignalConsumer;
import io.aeron.archive.codecs.RecordingSignal;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.ClusterTool;
import io.aeron.cluster.ConsensusModuleSnapshotAdapter;
import io.aeron.cluster.ConsensusModuleSnapshotListener;
import io.aeron.cluster.PendingServiceMessageTracker;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.ConsensusModuleEncoder;
import io.aeron.cluster.codecs.PendingMessageTrackerEncoder;
import io.aeron.cluster.codecs.SessionMessageHeaderEncoder;
import io.aeron.cluster.service.ClusterNodeControlProperties;
import io.aeron.samples.archive.RecordingSignalCapture;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.status.CountersReader;

public class ConsensusModuleSnapshotPendingServiceMessagesPatch {
    static final int SNAPSHOT_REPLAY_STREAM_ID = 103;
    static final int SNAPSHOT_RECORDING_STREAM_ID = 107;
    static final String PATCH_CHANNEL = "aeron:ipc?alias=consensus-module-snapshot-patch|term-length=64m";
    private final String archiveLocalRequestChannel;
    private final int archiveLocalRequestStreamId;

    public ConsensusModuleSnapshotPendingServiceMessagesPatch() {
        this(AeronArchive.Configuration.localControlChannel(), AeronArchive.Configuration.localControlStreamId());
    }

    ConsensusModuleSnapshotPendingServiceMessagesPatch(String archiveLocalRequestChannel, int archiveRequestStreamId) {
        this.archiveLocalRequestChannel = archiveLocalRequestChannel;
        this.archiveLocalRequestStreamId = archiveRequestStreamId;
    }

    public boolean execute(File clusterDir) {
        if (!clusterDir.exists() || !clusterDir.isDirectory()) {
            throw new IllegalArgumentException("invalid cluster directory: " + clusterDir.getAbsolutePath());
        }
        RecordingLog.Entry entry = ClusterTool.findLatestValidSnapshot((File)clusterDir);
        if (null == entry) {
            throw new ClusterException("no valid snapshot found");
        }
        long recordingId = entry.recordingId;
        ClusterNodeControlProperties properties = ClusterTool.loadControlProperties((File)clusterDir);
        RecordingSignalCapture recordingSignalCapture = new RecordingSignalCapture();
        try (Aeron aeron = Aeron.connect((Aeron.Context)new Aeron.Context().aeronDirectoryName(properties.aeronDirectoryName));
             AeronArchive archive = AeronArchive.connect((AeronArchive.Context)new AeronArchive.Context().controlRequestChannel(this.archiveLocalRequestChannel).controlRequestStreamId(this.archiveLocalRequestStreamId).controlResponseChannel("aeron:ipc").recordingSignalConsumer((RecordingSignalConsumer)recordingSignalCapture).aeron(aeron));){
            SnapshotReader snapshotReader = new SnapshotReader();
            ConsensusModuleSnapshotPendingServiceMessagesPatch.replayLocalSnapshotRecording(aeron, archive, recordingId, snapshotReader);
            TargetState[] targetStates = TargetState.compute(snapshotReader.serviceCount, snapshotReader.pendingMessageTrackers);
            if (ConsensusModuleSnapshotPendingServiceMessagesPatch.snapshotIsNotValid(snapshotReader, targetStates)) {
                long newStopPosition;
                long tempRecordingId = ConsensusModuleSnapshotPendingServiceMessagesPatch.createNewSnapshotRecording(aeron, archive, recordingId, targetStates);
                long stopPosition = ConsensusModuleSnapshotPendingServiceMessagesPatch.awaitRecordingStopPosition(archive, recordingId);
                if (stopPosition != (newStopPosition = ConsensusModuleSnapshotPendingServiceMessagesPatch.awaitRecordingStopPosition(archive, tempRecordingId))) {
                    throw new ClusterException("new snapshot recording incomplete: expectedStopPosition=" + stopPosition + ", actualStopPosition=" + newStopPosition);
                }
                recordingSignalCapture.reset();
                archive.truncateRecording(recordingId, 0L);
                recordingSignalCapture.awaitSignalForRecordingId(archive, recordingId, RecordingSignal.DELETE);
                long replicationId = archive.replicate(tempRecordingId, recordingId, archive.context().controlRequestStreamId(), "aeron:ipc", null);
                recordingSignalCapture.reset();
                recordingSignalCapture.awaitSignalForCorrelationId(archive, replicationId, RecordingSignal.SYNC);
                long replicatedStopPosition = recordingSignalCapture.position();
                if (stopPosition != replicatedStopPosition) {
                    throw new ClusterException("incomplete replication of the new recording: expectedStopPosition=" + stopPosition + ", replicatedStopPosition=" + replicatedStopPosition);
                }
                recordingSignalCapture.reset();
                archive.purgeRecording(tempRecordingId);
                recordingSignalCapture.awaitSignalForRecordingId(archive, tempRecordingId, RecordingSignal.DELETE);
                boolean bl = true;
                return bl;
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void replayLocalSnapshotRecording(Aeron aeron, AeronArchive archive, long recordingId, ConsensusModuleSnapshotListener listener) {
        String channel = "aeron:ipc";
        int streamId = 103;
        int sessionId = (int)archive.startReplay(recordingId, 0L, -1L, "aeron:ipc", 103);
        try {
            String replayChannel = ChannelUri.addSessionId((String)"aeron:ipc", (int)sessionId);
            try (Subscription subscription = aeron.addSubscription(replayChannel, 103);){
                Image image;
                while (null == (image = subscription.imageBySessionId(sessionId))) {
                    archive.checkForErrorResponse();
                    Thread.yield();
                }
                ConsensusModuleSnapshotAdapter adapter = new ConsensusModuleSnapshotAdapter(image, listener);
                while (true) {
                    int fragments = adapter.poll();
                    if (adapter.isDone()) {
                        break;
                    }
                    if (0 != fragments) continue;
                    if (image.isClosed()) {
                        throw new ClusterException("snapshot ended unexpectedly: " + image);
                    }
                    archive.checkForErrorResponse();
                    Thread.yield();
                }
            }
        }
        finally {
            archive.stopAllReplays(recordingId);
        }
    }

    private static boolean snapshotIsNotValid(SnapshotReader snapshotReader, TargetState[] targetStates) {
        for (int i = 0; i < targetStates.length; ++i) {
            TargetState targetState = targetStates[i];
            PendingMessageTrackerState actualState = snapshotReader.pendingMessageTrackers[i];
            if (targetState.nextServiceSessionId == actualState.nextServiceSessionId && targetState.logServiceSessionId == actualState.logServiceSessionId && (0 == actualState.pendingServiceMessageCount || targetState.logServiceSessionId + 1L == actualState.minClusterSessionId && targetState.nextServiceSessionId - 1L == actualState.maxClusterSessionId)) continue;
            return true;
        }
        TargetState defaultTracker = targetStates[0];
        return defaultTracker.nextServiceSessionId != snapshotReader.nextServiceSessionId || defaultTracker.logServiceSessionId != snapshotReader.logServiceSessionId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static long createNewSnapshotRecording(Aeron aeron, AeronArchive archive, long oldRecordingId, TargetState[] targetStates) {
        try (ExclusivePublication publication = archive.addRecordedExclusivePublication(PATCH_CHANNEL, 107);){
            long l;
            try {
                int publicationSessionId = publication.sessionId();
                CountersReader countersReader = aeron.countersReader();
                int counterId = ConsensusModuleSnapshotPendingServiceMessagesPatch.awaitRecordingCounter(publicationSessionId, archive.archiveId(), countersReader);
                long newRecordingId = RecordingPos.getRecordingId((CountersReader)countersReader, (int)counterId);
                ConsensusModuleSnapshotPendingServiceMessagesPatch.replayLocalSnapshotRecording(aeron, archive, oldRecordingId, new SnapshotWriter(publication, targetStates));
                ConsensusModuleSnapshotPendingServiceMessagesPatch.awaitRecordingComplete(countersReader, counterId, publication.position(), newRecordingId);
                l = newRecordingId;
            }
            catch (Throwable throwable) {
                archive.stopRecording((Publication)publication);
                throw throwable;
            }
            archive.stopRecording((Publication)publication);
            return l;
        }
    }

    private static int awaitRecordingCounter(int publicationSessionId, long archiveId, CountersReader countersReader) {
        int counterId;
        while (-1 == (counterId = RecordingPos.findCounterIdBySession((CountersReader)countersReader, (int)publicationSessionId, (long)archiveId))) {
            Thread.yield();
        }
        return counterId;
    }

    private static void awaitRecordingComplete(CountersReader counters, int counterId, long position, long recordingId) {
        while (counters.getCounterValue(counterId) < position) {
            Thread.yield();
            if (RecordingPos.isActive((CountersReader)counters, (int)counterId, (long)recordingId)) continue;
            throw new ClusterException("recording has stopped unexpectedly: " + recordingId);
        }
    }

    private static long awaitRecordingStopPosition(AeronArchive archive, long recordingId) {
        long stopPosition;
        while (-1L == (stopPosition = archive.getStopPosition(recordingId))) {
            Thread.yield();
        }
        return stopPosition;
    }

    public static void main(String[] args) {
        if (1 != args.length) {
            System.out.println("Usage: <cluster-dir>");
            System.exit(-1);
        }
        new ConsensusModuleSnapshotPendingServiceMessagesPatch().execute(new File(args[0]));
    }

    private static final class SnapshotReader
    implements ConsensusModuleSnapshotListener {
        long nextServiceSessionId = Long.MIN_VALUE;
        long logServiceSessionId = Long.MIN_VALUE;
        final PendingMessageTrackerState[] pendingMessageTrackers = new PendingMessageTrackerState[127];
        int serviceCount;

        private SnapshotReader() {
        }

        public void onLoadBeginSnapshot(int appVersion, TimeUnit timeUnit, DirectBuffer buffer, int offset, int length) {
        }

        public void onLoadConsensusModuleState(long nextSessionId, long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, DirectBuffer buffer, int offset, int length) {
            this.nextServiceSessionId = nextServiceSessionId;
            this.logServiceSessionId = logServiceSessionId;
        }

        public void onLoadPendingMessage(long clusterSessionId, DirectBuffer buffer, int offset, int length) {
            int serviceId = PendingServiceMessageTracker.serviceId((long)clusterSessionId);
            PendingMessageTrackerState trackerState = this.tracker(serviceId);
            ++trackerState.pendingServiceMessageCount;
            trackerState.minClusterSessionId = Math.min(trackerState.minClusterSessionId, clusterSessionId);
            trackerState.maxClusterSessionId = Math.max(trackerState.maxClusterSessionId, clusterSessionId);
        }

        public void onLoadClusterSession(long clusterSessionId, long correlationId, long openedLogPosition, long timeOfLastActivity, CloseReason closeReason, int responseStreamId, String responseChannel, DirectBuffer buffer, int offset, int length) {
        }

        public void onLoadTimer(long correlationId, long deadline, DirectBuffer buffer, int offset, int length) {
        }

        public void onLoadPendingMessageTracker(long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, int serviceId, DirectBuffer buffer, int offset, int length) {
            PendingMessageTrackerState trackerState = this.tracker(serviceId);
            trackerState.nextServiceSessionId = nextServiceSessionId;
            trackerState.logServiceSessionId = logServiceSessionId;
        }

        public void onLoadEndSnapshot(DirectBuffer buffer, int offset, int length) {
        }

        private PendingMessageTrackerState tracker(int serviceId) {
            PendingMessageTrackerState trackerState = this.pendingMessageTrackers[serviceId];
            if (null == trackerState) {
                this.pendingMessageTrackers[serviceId] = trackerState = new PendingMessageTrackerState();
                ++this.serviceCount;
            }
            return trackerState;
        }
    }

    private static final class PendingMessageTrackerState {
        long nextServiceSessionId = Long.MIN_VALUE;
        long logServiceSessionId = Long.MIN_VALUE;
        long minClusterSessionId = Long.MAX_VALUE;
        long maxClusterSessionId = Long.MIN_VALUE;
        int pendingServiceMessageCount;

        private PendingMessageTrackerState() {
        }
    }

    private static final class TargetState {
        final long nextServiceSessionId;
        final long logServiceSessionId;
        long clusterSessionId;

        private TargetState(long nextServiceSessionId, long logServiceSessionId) {
            this.nextServiceSessionId = nextServiceSessionId;
            this.logServiceSessionId = logServiceSessionId;
            this.clusterSessionId = logServiceSessionId + 1L;
        }

        static TargetState[] compute(int serviceCount, PendingMessageTrackerState[] states) {
            TargetState[] targetStates = new TargetState[serviceCount];
            for (int i = 0; i < serviceCount; ++i) {
                PendingMessageTrackerState state = states[i];
                long targetNextServiceSessionId = Math.max(Math.max(state.nextServiceSessionId, state.maxClusterSessionId + 1L), state.logServiceSessionId + 1L + (long)state.pendingServiceMessageCount);
                long targetLogServiceSessionId = targetNextServiceSessionId - 1L - (long)state.pendingServiceMessageCount;
                targetStates[i] = new TargetState(targetNextServiceSessionId, targetLogServiceSessionId);
            }
            return targetStates;
        }
    }

    private static final class SnapshotWriter
    implements ConsensusModuleSnapshotListener {
        private final ExpandableArrayBuffer tempBuffer = new ExpandableArrayBuffer(1024);
        private final ConsensusModuleEncoder consensusModuleEncoder = new ConsensusModuleEncoder();
        private final SessionMessageHeaderEncoder sessionMessageHeaderEncoder = new SessionMessageHeaderEncoder();
        private final PendingMessageTrackerEncoder pendingMessageTrackerEncoder = new PendingMessageTrackerEncoder();
        private final ExclusivePublication snapshotPublication;
        private final TargetState[] targetStates;

        SnapshotWriter(ExclusivePublication snapshotPublication, TargetState[] targetStates) {
            this.snapshotPublication = snapshotPublication;
            this.targetStates = targetStates;
        }

        public void onLoadBeginSnapshot(int appVersion, TimeUnit timeUnit, DirectBuffer buffer, int offset, int length) {
            this.writeToSnapshot(buffer, offset, length);
        }

        public void onLoadConsensusModuleState(long nextSessionId, long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, DirectBuffer buffer, int offset, int length) {
            TargetState defaultTrackerState = this.targetStates[0];
            this.tempBuffer.putBytes(0, buffer, offset, length);
            this.consensusModuleEncoder.wrap((MutableDirectBuffer)this.tempBuffer, 8).logServiceSessionId(defaultTrackerState.logServiceSessionId).nextServiceSessionId(defaultTrackerState.nextServiceSessionId);
            this.writeToSnapshot((DirectBuffer)this.tempBuffer, 0, length);
        }

        public void onLoadPendingMessage(long clusterSessionId, DirectBuffer buffer, int offset, int length) {
            TargetState targetState = this.targetStates[PendingServiceMessageTracker.serviceId((long)clusterSessionId)];
            this.tempBuffer.putBytes(0, buffer, offset, length);
            this.sessionMessageHeaderEncoder.wrap((MutableDirectBuffer)this.tempBuffer, 8).clusterSessionId(targetState.clusterSessionId++);
            this.writeToSnapshot((DirectBuffer)this.tempBuffer, 0, length);
        }

        public void onLoadClusterSession(long clusterSessionId, long correlationId, long openedLogPosition, long timeOfLastActivity, CloseReason closeReason, int responseStreamId, String responseChannel, DirectBuffer buffer, int offset, int length) {
            this.writeToSnapshot(buffer, offset, length);
        }

        public void onLoadTimer(long correlationId, long deadline, DirectBuffer buffer, int offset, int length) {
            this.writeToSnapshot(buffer, offset, length);
        }

        public void onLoadPendingMessageTracker(long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, int serviceId, DirectBuffer buffer, int offset, int length) {
            TargetState targetState = this.targetStates[serviceId];
            this.tempBuffer.putBytes(0, buffer, offset, length);
            this.pendingMessageTrackerEncoder.wrap((MutableDirectBuffer)this.tempBuffer, 8).logServiceSessionId(targetState.logServiceSessionId).nextServiceSessionId(targetState.nextServiceSessionId);
            this.writeToSnapshot((DirectBuffer)this.tempBuffer, 0, length);
        }

        public void onLoadEndSnapshot(DirectBuffer buffer, int offset, int length) {
            this.writeToSnapshot(buffer, offset, length);
        }

        private void writeToSnapshot(DirectBuffer buffer, int offset, int length) {
            long position;
            while ((position = this.snapshotPublication.offer(buffer, offset, length)) < 0L) {
                if (position == -4L || position == -1L || position == -5L) {
                    throw new ClusterException("cannot offer into a snapshot: " + Publication.errorString((long)position));
                }
                Thread.yield();
            }
        }
    }
}

