/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.codecs.RecordingSignal;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.ClusterTool;
import io.aeron.cluster.ConsensusModuleSnapshotAdapter;
import io.aeron.cluster.ConsensusModuleSnapshotListener;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.ConsensusModuleEncoder;
import io.aeron.cluster.codecs.PendingMessageTrackerEncoder;
import io.aeron.cluster.codecs.SessionMessageHeaderEncoder;
import io.aeron.cluster.service.ClusterNodeControlProperties;
import io.aeron.samples.archive.RecordingSignalCapture;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.concurrent.status.CountersReader;

public class ConsensusModuleSnapshotPendingServiceMessagesPatch {
    static final int SNAPSHOT_REPLAY_STREAM_ID = 103;
    static final int SNAPSHOT_RECORDING_STREAM_ID = 107;
    static final String PATCH_CHANNEL = "aeron:ipc?alias=consensus-module-snapshot-patch";
    private final String archiveLocalRequestChannel;
    private final int archiveLocalRequestStreamId;

    public ConsensusModuleSnapshotPendingServiceMessagesPatch() {
        this(AeronArchive.Configuration.localControlChannel(), AeronArchive.Configuration.localControlStreamId());
    }

    ConsensusModuleSnapshotPendingServiceMessagesPatch(String archiveLocalRequestChannel, int archiveRequestStreamId) {
        this.archiveLocalRequestChannel = archiveLocalRequestChannel;
        this.archiveLocalRequestStreamId = archiveRequestStreamId;
    }

    public boolean execute(File clusterDir) {
        if (!clusterDir.exists() || !clusterDir.isDirectory()) {
            throw new IllegalArgumentException("invalid cluster directory: " + clusterDir.getAbsolutePath());
        }
        RecordingLog.Entry entry = ClusterTool.findLatestValidSnapshot(clusterDir);
        if (null == entry) {
            throw new ClusterException("no valid snapshot found");
        }
        long recordingId = entry.recordingId;
        ClusterNodeControlProperties properties = ClusterTool.loadControlProperties(clusterDir);
        RecordingSignalCapture recordingSignalCapture = new RecordingSignalCapture();
        try (Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(properties.aeronDirectoryName));
             AeronArchive archive = AeronArchive.connect(new AeronArchive.Context().controlRequestChannel(this.archiveLocalRequestChannel).controlRequestStreamId(this.archiveLocalRequestStreamId).controlResponseChannel("aeron:ipc").recordingSignalConsumer(recordingSignalCapture).aeron(aeron));){
            SnapshotReader snapshotReader = new SnapshotReader();
            ConsensusModuleSnapshotPendingServiceMessagesPatch.replayLocalSnapshotRecording(aeron, archive, recordingId, snapshotReader);
            long targetNextServiceSessionId = Math.max(Math.max(snapshotReader.nextServiceSessionId, snapshotReader.maxClusterSessionId + 1L), snapshotReader.logServiceSessionId + 1L + (long)snapshotReader.pendingServiceMessageCount);
            long targetLogServiceSessionId = targetNextServiceSessionId - 1L - (long)snapshotReader.pendingServiceMessageCount;
            if (targetNextServiceSessionId != snapshotReader.nextServiceSessionId || targetLogServiceSessionId != snapshotReader.logServiceSessionId || 0 != snapshotReader.pendingServiceMessageCount && (targetLogServiceSessionId + 1L != snapshotReader.minClusterSessionId || targetNextServiceSessionId - 1L != snapshotReader.maxClusterSessionId)) {
                long newStopPosition;
                long tempRecordingId = ConsensusModuleSnapshotPendingServiceMessagesPatch.createNewSnapshotRecording(aeron, archive, recordingId, targetLogServiceSessionId, targetNextServiceSessionId);
                long stopPosition = ConsensusModuleSnapshotPendingServiceMessagesPatch.awaitRecordingStopPosition(archive, recordingId);
                if (stopPosition != (newStopPosition = ConsensusModuleSnapshotPendingServiceMessagesPatch.awaitRecordingStopPosition(archive, tempRecordingId))) {
                    throw new ClusterException("new snapshot recording incomplete: expectedStopPosition=" + stopPosition + ", actualStopPosition=" + newStopPosition);
                }
                recordingSignalCapture.reset();
                archive.truncateRecording(recordingId, 0L);
                recordingSignalCapture.awaitSignalForRecordingId(archive, recordingId, RecordingSignal.DELETE);
                long replicationId = archive.replicate(tempRecordingId, recordingId, archive.context().controlRequestStreamId(), "aeron:ipc", null);
                recordingSignalCapture.reset();
                recordingSignalCapture.awaitSignalForCorrelationId(archive, replicationId, RecordingSignal.SYNC);
                long replicatedStopPosition = recordingSignalCapture.position();
                if (stopPosition != replicatedStopPosition) {
                    throw new ClusterException("incomplete replication of the new recording: expectedStopPosition=" + stopPosition + ", replicatedStopPosition=" + replicatedStopPosition);
                }
                recordingSignalCapture.reset();
                archive.purgeRecording(tempRecordingId);
                recordingSignalCapture.awaitSignalForRecordingId(archive, tempRecordingId, RecordingSignal.DELETE);
                boolean bl = true;
                return bl;
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void replayLocalSnapshotRecording(Aeron aeron, AeronArchive archive, long recordingId, ConsensusModuleSnapshotListener listener) {
        String channel = "aeron:ipc";
        int streamId = 103;
        int sessionId = (int)archive.startReplay(recordingId, 0L, -1L, "aeron:ipc", 103);
        try {
            String replayChannel = ChannelUri.addSessionId("aeron:ipc", sessionId);
            try (Subscription subscription = aeron.addSubscription(replayChannel, 103);){
                Image image;
                while (null == (image = subscription.imageBySessionId(sessionId))) {
                    archive.checkForErrorResponse();
                    Thread.yield();
                }
                ConsensusModuleSnapshotAdapter adapter = new ConsensusModuleSnapshotAdapter(image, listener);
                while (true) {
                    int fragments;
                    if (0 != (fragments = adapter.poll())) {
                        continue;
                    }
                    if (adapter.isDone()) {
                        break;
                    }
                    if (image.isClosed()) {
                        throw new ClusterException("snapshot ended unexpectedly: " + image);
                    }
                    archive.checkForErrorResponse();
                    Thread.yield();
                }
            }
        }
        finally {
            archive.stopAllReplays(recordingId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static long createNewSnapshotRecording(Aeron aeron, AeronArchive archive, long oldRecordingId, long targetLogServiceSessionId, long targetNextServiceSessionId) {
        try (ExclusivePublication publication = archive.addRecordedExclusivePublication(PATCH_CHANNEL, 107);){
            try {
                int publicationSessionId = publication.sessionId();
                CountersReader countersReader = aeron.countersReader();
                int counterId = ConsensusModuleSnapshotPendingServiceMessagesPatch.awaitRecordingCounter(publicationSessionId, archive.archiveId(), countersReader);
                long newRecordingId = RecordingPos.getRecordingId(countersReader, counterId);
                ConsensusModuleSnapshotPendingServiceMessagesPatch.replayLocalSnapshotRecording(aeron, archive, oldRecordingId, new SnapshotWriter(publication, targetLogServiceSessionId, targetNextServiceSessionId));
                ConsensusModuleSnapshotPendingServiceMessagesPatch.awaitRecordingComplete(countersReader, counterId, publication.position(), newRecordingId);
                long l = newRecordingId;
                archive.stopRecording(publication);
                return l;
            }
            catch (Throwable throwable) {
                archive.stopRecording(publication);
                throw throwable;
            }
        }
    }

    private static int awaitRecordingCounter(int publicationSessionId, long archiveId, CountersReader countersReader) {
        int counterId;
        while (-1 == (counterId = RecordingPos.findCounterIdBySession(countersReader, publicationSessionId, archiveId))) {
            Thread.yield();
        }
        return counterId;
    }

    private static void awaitRecordingComplete(CountersReader counters, int counterId, long position, long recordingId) {
        while (counters.getCounterValue(counterId) < position) {
            Thread.yield();
            if (RecordingPos.isActive(counters, counterId, recordingId)) continue;
            throw new ClusterException("recording has stopped unexpectedly: " + recordingId);
        }
    }

    private static long awaitRecordingStopPosition(AeronArchive archive, long recordingId) {
        long stopPosition;
        while (-1L == (stopPosition = archive.getStopPosition(recordingId))) {
            Thread.yield();
        }
        return stopPosition;
    }

    public static void main(String[] args) {
        if (1 != args.length) {
            System.out.println("Usage: <cluster-dir>");
            System.exit(-1);
        }
        new ConsensusModuleSnapshotPendingServiceMessagesPatch().execute(new File(args[0]));
    }

    private static final class SnapshotReader
    implements ConsensusModuleSnapshotListener {
        private long nextServiceSessionId = Long.MIN_VALUE;
        private long logServiceSessionId = Long.MIN_VALUE;
        private long minClusterSessionId = Long.MAX_VALUE;
        private long maxClusterSessionId = Long.MIN_VALUE;
        private int pendingServiceMessageCount = 0;

        private SnapshotReader() {
        }

        @Override
        public void onLoadBeginSnapshot(int appVersion, TimeUnit timeUnit, DirectBuffer buffer, int offset, int length) {
        }

        @Override
        public void onLoadConsensusModuleState(long nextSessionId, long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, DirectBuffer buffer, int offset, int length) {
            this.nextServiceSessionId = nextServiceSessionId;
            this.logServiceSessionId = logServiceSessionId;
        }

        @Override
        public void onLoadPendingMessage(long clusterSessionId, DirectBuffer buffer, int offset, int length) {
            ++this.pendingServiceMessageCount;
            this.minClusterSessionId = Math.min(this.minClusterSessionId, clusterSessionId);
            this.maxClusterSessionId = Math.max(this.maxClusterSessionId, clusterSessionId);
        }

        @Override
        public void onLoadClusterSession(long clusterSessionId, long correlationId, long openedLogPosition, long timeOfLastActivity, CloseReason closeReason, int responseStreamId, String responseChannel, DirectBuffer buffer, int offset, int length) {
        }

        @Override
        public void onLoadTimer(long correlationId, long deadline, DirectBuffer buffer, int offset, int length) {
        }

        @Override
        public void onLoadPendingMessageTracker(long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, int serviceId, DirectBuffer buffer, int offset, int length) {
        }

        @Override
        public void onLoadEndSnapshot(DirectBuffer buffer, int offset, int length) {
        }
    }

    private static final class SnapshotWriter
    implements ConsensusModuleSnapshotListener {
        private final ExpandableArrayBuffer tempBuffer = new ExpandableArrayBuffer(1024);
        private final ConsensusModuleEncoder consensusModuleEncoder = new ConsensusModuleEncoder();
        private final SessionMessageHeaderEncoder sessionMessageHeaderEncoder = new SessionMessageHeaderEncoder();
        private final PendingMessageTrackerEncoder pendingMessageTrackerEncoder = new PendingMessageTrackerEncoder();
        private final ExclusivePublication snapshotPublication;
        private final long targetNextServiceSessionId;
        private final long targetLogServiceSessionId;
        private long nextClusterSessionId;

        SnapshotWriter(ExclusivePublication snapshotPublication, long targetLogServiceSessionId, long targetNextServiceSessionId) {
            this.snapshotPublication = snapshotPublication;
            this.targetLogServiceSessionId = targetLogServiceSessionId;
            this.targetNextServiceSessionId = targetNextServiceSessionId;
            this.nextClusterSessionId = targetLogServiceSessionId + 1L;
        }

        @Override
        public void onLoadBeginSnapshot(int appVersion, TimeUnit timeUnit, DirectBuffer buffer, int offset, int length) {
            this.writeToSnapshot(buffer, offset, length);
        }

        @Override
        public void onLoadConsensusModuleState(long nextSessionId, long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, DirectBuffer buffer, int offset, int length) {
            this.tempBuffer.putBytes(0, buffer, offset, length);
            this.consensusModuleEncoder.wrap(this.tempBuffer, 8).logServiceSessionId(this.targetLogServiceSessionId).nextServiceSessionId(this.targetNextServiceSessionId);
            this.writeToSnapshot(this.tempBuffer, 0, length);
        }

        @Override
        public void onLoadPendingMessage(long clusterSessionId, DirectBuffer buffer, int offset, int length) {
            this.tempBuffer.putBytes(0, buffer, offset, length);
            this.sessionMessageHeaderEncoder.wrap(this.tempBuffer, 8).clusterSessionId(this.nextClusterSessionId++);
            this.writeToSnapshot(this.tempBuffer, 0, length);
        }

        @Override
        public void onLoadClusterSession(long clusterSessionId, long correlationId, long openedLogPosition, long timeOfLastActivity, CloseReason closeReason, int responseStreamId, String responseChannel, DirectBuffer buffer, int offset, int length) {
            this.writeToSnapshot(buffer, offset, length);
        }

        @Override
        public void onLoadTimer(long correlationId, long deadline, DirectBuffer buffer, int offset, int length) {
            this.writeToSnapshot(buffer, offset, length);
        }

        @Override
        public void onLoadPendingMessageTracker(long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity, int serviceId, DirectBuffer buffer, int offset, int length) {
            if (0 == serviceId) {
                this.tempBuffer.putBytes(0, buffer, offset, length);
                this.pendingMessageTrackerEncoder.wrap(this.tempBuffer, 8).logServiceSessionId(this.targetLogServiceSessionId).nextServiceSessionId(this.targetNextServiceSessionId);
                this.writeToSnapshot(this.tempBuffer, 0, length);
            } else {
                this.writeToSnapshot(buffer, offset, length);
            }
        }

        @Override
        public void onLoadEndSnapshot(DirectBuffer buffer, int offset, int length) {
            this.writeToSnapshot(buffer, offset, length);
        }

        private void writeToSnapshot(DirectBuffer buffer, int offset, int length) {
            long position;
            while ((position = this.snapshotPublication.offer(buffer, offset, length)) < 0L) {
                if (position == -4L || position == -1L || position == -5L) {
                    throw new ClusterException("cannot offer into a snapshot: " + Publication.errorString(position));
                }
                Thread.yield();
            }
        }
    }
}

