/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples.cluster.tutorial;

import io.aeron.CommonContext;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchiveThreadingMode;
import io.aeron.cluster.ClusterControl;
import io.aeron.cluster.ClusteredMediaDriver;
import io.aeron.cluster.ConsensusModule;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.client.EgressListener;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.service.ClientSession;
import io.aeron.cluster.service.Cluster;
import io.aeron.cluster.service.ClusteredService;
import io.aeron.cluster.service.ClusteredServiceContainer;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import java.util.concurrent.TimeUnit;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.YieldingIdleStrategy;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.console.ContinueBarrier;

public final class SingleNodeCluster
implements AutoCloseable {
    private static final int MESSAGE_ID = 1;
    private static final int TIMER_ID = 2;
    private final ClusteredMediaDriver clusteredMediaDriver;
    private final ClusteredServiceContainer container;
    private MediaDriver clientMediaDriver;
    private AeronCluster client;
    private final IdleStrategy idleStrategy = YieldingIdleStrategy.INSTANCE;
    private final ExpandableArrayBuffer msgBuffer = new ExpandableArrayBuffer();
    private final EgressListener egressMessageListener = new EgressListener(){

        @Override
        public void onMessage(long clusterSessionId, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
            System.out.println("egress onMessage " + clusterSessionId);
        }

        @Override
        public void onNewLeader(long clusterSessionId, long leadershipTermId, int leaderMemberId, String ingressEndpoints) {
            System.out.println("SingleNodeCluster.onNewLeader");
        }
    };

    public SingleNodeCluster(ClusteredService externalService, boolean isCleanStart) {
        MediaDriver.Context mediaDriverContext = new MediaDriver.Context();
        ConsensusModule.Context consensusModuleContext = new ConsensusModule.Context();
        Archive.Context archiveContext = new Archive.Context();
        ClusteredServiceContainer.Context serviceContainerContext = new ClusteredServiceContainer.Context();
        ClusteredService service = null == externalService ? new Service() : externalService;
        mediaDriverContext.threadingMode(ThreadingMode.SHARED).errorHandler(Throwable::printStackTrace).dirDeleteOnShutdown(true).dirDeleteOnStart(true);
        archiveContext.recordingEventsEnabled(false).threadingMode(ArchiveThreadingMode.SHARED).deleteArchiveOnStart(isCleanStart);
        consensusModuleContext.errorHandler(Throwable::printStackTrace).deleteDirOnStart(isCleanStart);
        serviceContainerContext.clusteredService(service).errorHandler(Throwable::printStackTrace);
        this.clusteredMediaDriver = ClusteredMediaDriver.launch(mediaDriverContext, archiveContext, consensusModuleContext);
        this.container = ClusteredServiceContainer.launch(serviceContainerContext);
    }

    @Override
    public void close() {
        ErrorHandler errorHandler = this.clusteredMediaDriver.mediaDriver().context().errorHandler();
        CloseHelper.close(errorHandler, this.client);
        CloseHelper.close(errorHandler, this.clientMediaDriver);
        CloseHelper.close(errorHandler, this.clusteredMediaDriver.consensusModule());
        CloseHelper.close(errorHandler, this.container);
        CloseHelper.close(this.clusteredMediaDriver);
    }

    void connectClientToCluster() {
        String aeronDirectoryName = CommonContext.getAeronDirectoryName() + "-client";
        this.clientMediaDriver = MediaDriver.launch(new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirDeleteOnStart(true).dirDeleteOnShutdown(true).errorHandler(Throwable::printStackTrace).aeronDirectoryName(aeronDirectoryName));
        this.client = AeronCluster.connect(new AeronCluster.Context().errorHandler(Throwable::printStackTrace).egressListener(this.egressMessageListener).aeronDirectoryName(aeronDirectoryName));
    }

    void sendMessageToCluster(int id, int messageLength) {
        this.msgBuffer.putInt(0, id);
        this.idleStrategy.reset();
        while (this.client.offer(this.msgBuffer, 0, messageLength) < 0L) {
            this.idleStrategy.idle();
        }
    }

    int pollEgress() {
        return null == this.client ? 0 : this.client.pollEgress();
    }

    void pollEgressUntilMessage() {
        this.idleStrategy.reset();
        while (this.pollEgress() <= 0) {
            this.idleStrategy.idle();
        }
    }

    void takeSnapshot() {
        ConsensusModule.Context consensusModuleContext = this.clusteredMediaDriver.consensusModule().context();
        Counter snapshotCounter = consensusModuleContext.snapshotCounter();
        long snapshotCount = snapshotCounter.get();
        AtomicCounter controlToggle = ClusterControl.findControlToggle(this.clusteredMediaDriver.mediaDriver().context().countersManager(), consensusModuleContext.clusterId());
        ClusterControl.ToggleState.SNAPSHOT.toggle(controlToggle);
        this.idleStrategy.reset();
        while (snapshotCounter.get() <= snapshotCount) {
            this.idleStrategy.idle();
        }
    }

    static void sendSingleMessageAndEchoBack() {
        try (SingleNodeCluster cluster = new SingleNodeCluster(null, true);){
            cluster.connectClientToCluster();
            cluster.sendMessageToCluster(1, 4);
            cluster.pollEgressUntilMessage();
            ContinueBarrier barrier = new ContinueBarrier("continue");
            barrier.await();
        }
    }

    static void loadPreviousLogAndSendAnotherMessageAndEchoBack() {
        try (SingleNodeCluster cluster = new SingleNodeCluster(null, false);){
            cluster.connectClientToCluster();
            cluster.sendMessageToCluster(1, 4);
            cluster.pollEgressUntilMessage();
            ContinueBarrier barrier = new ContinueBarrier("continue");
            barrier.await();
        }
    }

    public static void main(String[] args) {
        SingleNodeCluster.sendSingleMessageAndEchoBack();
        SingleNodeCluster.loadPreviousLogAndSendAnotherMessageAndEchoBack();
    }

    static class Service
    implements ClusteredService {
        protected Cluster cluster;
        protected IdleStrategy idleStrategy;
        private int messageCount = 0;
        private final ExpandableArrayBuffer buffer = new ExpandableArrayBuffer();

        Service() {
        }

        @Override
        public void onStart(Cluster cluster, Image snapshotImage) {
            this.cluster = cluster;
            this.idleStrategy = cluster.idleStrategy();
            if (null != snapshotImage) {
                System.out.println("onStart load snapshot");
                FragmentHandler fragmentHandler = (buffer, offset, length, header) -> {
                    this.messageCount = buffer.getInt(offset);
                };
                this.idleStrategy.reset();
                while (snapshotImage.poll(fragmentHandler, 1) <= 0) {
                    this.idleStrategy.idle();
                }
                System.out.println("snapshot messageCount=" + this.messageCount);
            }
        }

        @Override
        public void onSessionOpen(ClientSession session, long timestamp) {
            System.out.println("onSessionOpen " + session.id());
        }

        @Override
        public void onSessionClose(ClientSession session, long timestamp, CloseReason closeReason) {
            System.out.println("onSessionClose " + session.id() + " " + (Object)((Object)closeReason));
        }

        @Override
        public void onSessionMessage(ClientSession session, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
            ++this.messageCount;
            System.out.println((Object)((Object)this.cluster.role()) + " onSessionMessage " + session.id() + " count=" + this.messageCount);
            int id = buffer.getInt(offset);
            if (2 == id) {
                this.idleStrategy.reset();
                while (!this.cluster.scheduleTimer(this.serviceCorrelationId(1), this.cluster.time() + 1000L)) {
                    this.idleStrategy.idle();
                }
            } else {
                this.idleStrategy.reset();
                while (session.offer(buffer, offset, length) < 0L) {
                    this.idleStrategy.idle();
                }
            }
        }

        @Override
        public void onTimerEvent(long correlationId, long timestamp) {
            System.out.println("onTimerEvent " + correlationId);
            ExpandableArrayBuffer buffer = new ExpandableArrayBuffer();
            buffer.putInt(0, 1);
            this.cluster.forEachClientSession(clientSession -> {
                this.idleStrategy.reset();
                while (clientSession.offer(buffer, 0, 4) < 0L) {
                    this.idleStrategy.idle();
                }
            });
        }

        @Override
        public void onTakeSnapshot(ExclusivePublication snapshotPublication) {
            System.out.println("onTakeSnapshot messageCount=" + this.messageCount);
            this.buffer.putInt(0, this.messageCount);
            this.idleStrategy.reset();
            while (snapshotPublication.offer(this.buffer, 0, 4) < 0L) {
                this.idleStrategy.idle();
            }
        }

        @Override
        public void onRoleChange(Cluster.Role newRole) {
            System.out.println("onRoleChange " + (Object)((Object)newRole));
        }

        @Override
        public void onTerminate(Cluster cluster) {
        }

        @Override
        public void onNewLeadershipTermEvent(long leadershipTermId, long logPosition, long timestamp, long termBaseLogPosition, int leaderMemberId, int logSessionId, TimeUnit timeUnit, int appVersion) {
            System.out.println("onNewLeadershipTermEvent");
        }

        protected long serviceCorrelationId(int correlationId) {
            return (long)this.cluster.context().serviceId() << 56 | (long)correlationId;
        }
    }
}

