/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples.archive;

import io.aeron.Aeron;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.CommonContext;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchiveThreadingMode;
import io.aeron.archive.ArchivingMediaDriver;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import java.io.File;
import java.nio.ByteOrder;
import java.util.Random;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.SystemUtil;
import org.agrona.collections.LongArrayList;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.YieldingIdleStrategy;
import org.agrona.concurrent.status.CountersReader;

public class IndexedReplicatedRecording
implements AutoCloseable {
    static final int MESSAGE_INDEX_OFFSET = 0;
    static final int TIMESTAMP_OFFSET = 8;
    static final int HEADER_LENGTH = 16;
    static final int MESSAGE_BURST_COUNT = 10000;
    private static final int TERM_LENGTH = 65536;
    private static final long CATALOG_CAPACITY = 65536L;
    private static final int SRC_CONTROL_STREAM_ID = 10;
    private static final String SRC_CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:8090";
    private static final String SRC_CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:0";
    private static final String DST_CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:8095";
    private static final String DST_CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:0";
    private static final String SRC_REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0";
    private static final String DST_REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0";
    private static final int LIVE_STREAM_ID = 1033;
    private static final String LIVE_CHANNEL = new ChannelUriStringBuilder().media("udp").controlEndpoint("localhost:8100").termLength(Integer.valueOf(65536)).build();
    private static final int INDEX_STREAM_ID = 1097;
    private static final String INDEX_CHANNEL = new ChannelUriStringBuilder().media("ipc").termLength(Integer.valueOf(65536)).build();
    private final ArchivingMediaDriver srcArchivingMediaDriver;
    private final ArchivingMediaDriver dstArchivingMediaDriver;
    private final Aeron srcAeron;
    private final Aeron dstAeron;
    private final AeronArchive srcAeronArchive;
    private final AeronArchive dstAeronArchive;

    IndexedReplicatedRecording() {
        String srcAeronDirectoryName = CommonContext.getAeronDirectoryName() + "-src";
        System.out.println("srcAeronDirectoryName=" + srcAeronDirectoryName);
        String dstAeronDirectoryName = CommonContext.getAeronDirectoryName() + "-dst";
        System.out.println("dstAeronDirectoryName=" + dstAeronDirectoryName);
        File srcArchiveDir = new File(SystemUtil.tmpDirName(), "src-archive");
        System.out.println("srcArchiveDir=" + srcArchiveDir);
        this.srcArchivingMediaDriver = ArchivingMediaDriver.launch((MediaDriver.Context)new MediaDriver.Context().aeronDirectoryName(srcAeronDirectoryName).termBufferSparseFile(true).threadingMode(ThreadingMode.SHARED).errorHandler(Throwable::printStackTrace).spiesSimulateConnection(true).dirDeleteOnShutdown(true).dirDeleteOnStart(true), (Archive.Context)new Archive.Context().catalogCapacity(65536L).controlChannel(SRC_CONTROL_REQUEST_CHANNEL).archiveClientContext(new AeronArchive.Context().controlResponseChannel("aeron:udp?endpoint=localhost:0")).replicationChannel("aeron:udp?endpoint=localhost:0").deleteArchiveOnStart(true).archiveDir(srcArchiveDir).fileSyncLevel(0).threadingMode(ArchiveThreadingMode.SHARED));
        File dstArchiveDir = new File(SystemUtil.tmpDirName(), "dst-archive");
        System.out.println("dstArchiveDir=" + dstArchiveDir);
        this.dstArchivingMediaDriver = ArchivingMediaDriver.launch((MediaDriver.Context)new MediaDriver.Context().aeronDirectoryName(dstAeronDirectoryName).termBufferSparseFile(true).threadingMode(ThreadingMode.SHARED).errorHandler(Throwable::printStackTrace).spiesSimulateConnection(true).dirDeleteOnShutdown(true).dirDeleteOnStart(true), (Archive.Context)new Archive.Context().catalogCapacity(65536L).controlChannel(DST_CONTROL_REQUEST_CHANNEL).archiveClientContext(new AeronArchive.Context().controlResponseChannel("aeron:udp?endpoint=localhost:0")).replicationChannel("aeron:udp?endpoint=localhost:0").deleteArchiveOnStart(true).archiveDir(dstArchiveDir).fileSyncLevel(0).threadingMode(ArchiveThreadingMode.SHARED));
        this.srcAeron = Aeron.connect((Aeron.Context)new Aeron.Context().aeronDirectoryName(srcAeronDirectoryName));
        this.dstAeron = Aeron.connect((Aeron.Context)new Aeron.Context().aeronDirectoryName(dstAeronDirectoryName));
        this.srcAeronArchive = AeronArchive.connect((AeronArchive.Context)new AeronArchive.Context().idleStrategy((IdleStrategy)YieldingIdleStrategy.INSTANCE).controlRequestChannel(SRC_CONTROL_REQUEST_CHANNEL).controlResponseChannel("aeron:udp?endpoint=localhost:0").aeron(this.srcAeron));
        this.dstAeronArchive = AeronArchive.connect((AeronArchive.Context)new AeronArchive.Context().idleStrategy((IdleStrategy)YieldingIdleStrategy.INSTANCE).controlRequestChannel(DST_CONTROL_REQUEST_CHANNEL).controlResponseChannel("aeron:udp?endpoint=localhost:0").aeron(this.dstAeron));
    }

    @Override
    public void close() {
        CloseHelper.closeAll((AutoCloseable[])new AutoCloseable[]{this.srcAeronArchive, this.dstAeronArchive, this.srcAeron, this.dstAeron, this.srcArchivingMediaDriver, this.dstArchivingMediaDriver});
        this.srcArchivingMediaDriver.archive().context().deleteDirectory();
        this.dstArchivingMediaDriver.archive().context().deleteDirectory();
    }

    public static void main(String[] args) throws InterruptedException {
        try (IndexedReplicatedRecording test = new IndexedReplicatedRecording();){
            ExclusivePublication publication = test.srcAeron.addExclusivePublication(LIVE_CHANNEL, 1033);
            String sessionSpecificLiveChannel = LIVE_CHANNEL + "|session-id=" + publication.sessionId();
            Sequencer sequencer = new Sequencer(10000, (Publication)publication);
            Indexer primaryIndexer = new Indexer(test.srcAeron.addSubscription("aeron-spy:" + sessionSpecificLiveChannel, 1033), (Publication)test.srcAeron.addExclusivePublication(INDEX_CHANNEL, 1097), publication.sessionId());
            test.srcAeronArchive.startRecording(INDEX_CHANNEL, 1097, SourceLocation.LOCAL, true);
            Thread primaryIndexerThread = Indexer.start(primaryIndexer);
            long srcRecordingSubscriptionId = test.srcAeronArchive.startRecording(sessionSpecificLiveChannel, 1033, SourceLocation.LOCAL, true);
            CountersReader srcCounters = test.srcAeron.countersReader();
            int srcCounterId = IndexedReplicatedRecording.awaitRecordingCounterId(srcCounters, publication.sessionId(), test.srcAeronArchive.archiveId());
            long srcRecordingId = RecordingPos.getRecordingId((CountersReader)srcCounters, (int)srcCounterId);
            sequencer.sendBurst();
            long channelTagId = test.dstAeron.nextCorrelationId();
            long subscriptionTagId = test.dstAeron.nextCorrelationId();
            String taggedChannel = "aeron:udp?control-mode=manual|rejoin=false|tags=" + channelTagId + "," + subscriptionTagId;
            Indexer secondaryIndexer = new Indexer(test.dstAeron.addSubscription(taggedChannel, 1033), (Publication)test.dstAeron.addExclusivePublication(INDEX_CHANNEL, 1097), publication.sessionId());
            test.dstAeronArchive.startRecording(INDEX_CHANNEL, 1097, SourceLocation.LOCAL, true);
            Thread secondaryIndexerThread = Indexer.start(secondaryIndexer);
            long replicationId = test.dstAeronArchive.taggedReplicate(srcRecordingId, -1L, channelTagId, subscriptionTagId, 10, SRC_CONTROL_REQUEST_CHANNEL, LIVE_CHANNEL);
            sequencer.sendBurst();
            sequencer.sendBurst();
            long position = publication.position();
            IndexedReplicatedRecording.awaitPosition(srcCounters, srcCounterId, position);
            primaryIndexer.awaitPosition(position);
            CountersReader dstCounters = test.dstAeron.countersReader();
            int dstCounterId = IndexedReplicatedRecording.awaitRecordingCounterId(dstCounters, publication.sessionId(), test.srcAeronArchive.archiveId());
            IndexedReplicatedRecording.awaitPosition(dstCounters, dstCounterId, position);
            secondaryIndexer.awaitPosition(position);
            primaryIndexerThread.interrupt();
            primaryIndexerThread.join();
            primaryIndexer.close();
            secondaryIndexerThread.interrupt();
            secondaryIndexerThread.join();
            secondaryIndexer.close();
            test.dstAeronArchive.stopReplication(replicationId);
            test.srcAeronArchive.stopRecording(srcRecordingSubscriptionId);
            IndexedReplicatedRecording.assertEquals("index", sequencer.nextMessageIndex(), primaryIndexer.nextMessageIndex());
            IndexedReplicatedRecording.assertEquals("index", sequencer.nextMessageIndex(), secondaryIndexer.nextMessageIndex());
            IndexedReplicatedRecording.assertEquals("positions", primaryIndexer.messagePositions(), secondaryIndexer.messagePositions());
            IndexedReplicatedRecording.assertEquals("timestamps", primaryIndexer.timestamps(), secondaryIndexer.timestamps());
            IndexedReplicatedRecording.assertEquals("timestamp positions", primaryIndexer.timestampPositions(), secondaryIndexer.timestampPositions());
        }
    }

    static int awaitRecordingCounterId(CountersReader counters, int sessionId, long archiveId) throws InterruptedException {
        int counterId;
        while (-1 == (counterId = RecordingPos.findCounterIdBySession((CountersReader)counters, (int)sessionId, (long)archiveId))) {
            Thread.yield();
            if (!Thread.interrupted()) continue;
            throw new InterruptedException();
        }
        return counterId;
    }

    static void awaitPosition(CountersReader counters, int counterId, long position) throws InterruptedException {
        while (counters.getCounterValue(counterId) < position) {
            if (counters.getCounterState(counterId) != 1) {
                throw new IllegalStateException("count not active: " + counterId);
            }
            Thread.yield();
            if (!Thread.interrupted()) continue;
            throw new InterruptedException();
        }
    }

    static void assertEquals(String type, long srcValue, long dstValue) {
        if (srcValue != dstValue) {
            throw new IllegalStateException(type + " not equal: srcValue=" + srcValue + " dstValue=" + dstValue);
        }
    }

    static void assertEquals(String type, LongArrayList srcList, LongArrayList dstList) {
        int dstSize;
        int srcSize = srcList.size();
        if (srcSize != (dstSize = dstList.size())) {
            throw new IllegalStateException(type + " not equal: srcList.size=" + srcSize + " dstList.size=" + dstSize);
        }
        for (int i = 0; i < srcSize; ++i) {
            long dstVal;
            long srcVal = srcList.getLong(i);
            if (srcVal == (dstVal = srcList.getLong(i))) continue;
            throw new IllegalStateException(type + " [" + i + "] not equal: srcVal=" + srcVal + " dstVal=" + dstVal);
        }
    }

    static class Sequencer
    implements AutoCloseable {
        private static final int MAX_MESSAGE_LENGTH = 1000;
        private long nextMessageIndex;
        private final int burstLength;
        private final Publication publication;
        private final Random random = new Random();
        private final UnsafeBuffer buffer = new UnsafeBuffer(new byte[1016]);

        Sequencer(int burstLength, Publication publication) {
            this.burstLength = burstLength;
            this.publication = publication;
            this.buffer.setMemory(16, 1000, (byte)88);
        }

        @Override
        public void close() {
            CloseHelper.close((AutoCloseable)this.publication);
        }

        long nextMessageIndex() {
            return this.nextMessageIndex;
        }

        void sendBurst() throws InterruptedException {
            for (int i = 0; i < this.burstLength; ++i) {
                this.appendMessage();
            }
        }

        private void appendMessage() throws InterruptedException {
            int variableLength = this.random.nextInt(1000);
            this.buffer.putLong(0, this.nextMessageIndex, ByteOrder.LITTLE_ENDIAN);
            this.buffer.putLong(8, System.currentTimeMillis(), ByteOrder.LITTLE_ENDIAN);
            while (this.publication.offer((DirectBuffer)this.buffer, 0, 16 + variableLength) < 0L) {
                Thread.yield();
                if (!Thread.interrupted()) continue;
                throw new InterruptedException();
            }
            ++this.nextMessageIndex;
        }
    }

    static class Indexer
    implements AutoCloseable,
    Runnable,
    ControlledFragmentHandler {
        private static final int FRAGMENT_LIMIT = 10;
        private static final int INDEX_BUFFER_CAPACITY = 1024;
        private static final int BATCH_SIZE = 126;
        private final int sessionId;
        private int nextMessageIndex = 0;
        private int batchIndex = 0;
        private long lastMessagePosition = -1L;
        private final Subscription subscription;
        private final Publication publication;
        private Image image;
        private final LongArrayList messagePositions = new LongArrayList();
        private final LongArrayList timestamps = new LongArrayList();
        private final LongArrayList timestampPositions = new LongArrayList();
        private final UnsafeBuffer indexBuffer = new UnsafeBuffer(new byte[1024]);

        static Thread start(Indexer indexer) {
            Thread thread = new Thread(indexer);
            thread.setName("indexer");
            thread.setDaemon(true);
            thread.start();
            return thread;
        }

        Indexer(Subscription subscription, Publication publication, int sessionId) {
            this.subscription = subscription;
            this.publication = publication;
            this.sessionId = sessionId;
        }

        @Override
        public void close() {
            CloseHelper.close((AutoCloseable)this.subscription);
        }

        long position() {
            if (null == this.image) {
                return -1L;
            }
            return this.image.position();
        }

        void awaitPosition(long position) {
            while (this.position() < position) {
                Thread.yield();
            }
        }

        long nextMessageIndex() {
            return this.nextMessageIndex;
        }

        LongArrayList messagePositions() {
            return this.messagePositions;
        }

        LongArrayList timestamps() {
            return this.timestamps;
        }

        LongArrayList timestampPositions() {
            return this.timestampPositions;
        }

        @Override
        public void run() {
            Image image;
            while (!this.subscription.isConnected() || !this.publication.isConnected()) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException ignore) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            this.image = image = this.subscription.imageBySessionId(this.sessionId);
            if (null == image) {
                throw new IllegalStateException("session not found");
            }
            this.lastMessagePosition = image.joinPosition();
            YieldingIdleStrategy idleStrategy = YieldingIdleStrategy.INSTANCE;
            int fragments;
            while (0 != (fragments = image.controlledPoll((ControlledFragmentHandler)this, 10)) || !Thread.interrupted() && !image.isClosed()) {
                idleStrategy.idle(fragments);
            }
            return;
        }

        public ControlledFragmentHandler.Action onFragment(DirectBuffer buffer, int offset, int length, Header header) {
            long currentPosition = this.lastMessagePosition;
            long index = buffer.getLong(offset + 0, ByteOrder.LITTLE_ENDIAN);
            if (index != (long)this.nextMessageIndex) {
                throw new IllegalStateException("invalid index: expected=" + this.nextMessageIndex + " actual=" + index);
            }
            if (0 == this.batchIndex) {
                long timestamp = buffer.getLong(offset + 8, ByteOrder.LITTLE_ENDIAN);
                this.timestamps.addLong(timestamp);
                this.timestampPositions.addLong(currentPosition);
                this.indexBuffer.putLong(0, (long)this.nextMessageIndex, ByteOrder.LITTLE_ENDIAN);
                this.indexBuffer.putLong(8, timestamp, ByteOrder.LITTLE_ENDIAN);
            }
            int positionOffset = 16 + this.batchIndex * 8;
            this.indexBuffer.putLong(positionOffset, currentPosition, ByteOrder.LITTLE_ENDIAN);
            if (++this.batchIndex >= 126) {
                if (this.publication.offer((DirectBuffer)this.indexBuffer, 0, 1024) <= 0L) {
                    --this.batchIndex;
                    return ControlledFragmentHandler.Action.ABORT;
                }
                this.batchIndex = 0;
            }
            this.messagePositions.addLong(currentPosition);
            this.lastMessagePosition = header.position();
            ++this.nextMessageIndex;
            return ControlledFragmentHandler.Action.CONTINUE;
        }
    }
}

