/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.samples.RateReporter;
import io.aeron.samples.SampleConfiguration;
import io.aeron.samples.SamplesUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.BufferUtil;
import org.agrona.DirectBuffer;
import org.agrona.SystemUtil;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.console.ContinueBarrier;

public class EmbeddedThroughput {
    private static final long NUMBER_OF_MESSAGES = SampleConfiguration.NUMBER_OF_MESSAGES;
    private static final long LINGER_TIMEOUT_MS = SampleConfiguration.LINGER_TIMEOUT_MS;
    private static final int STREAM_ID = SampleConfiguration.STREAM_ID;
    private static final int MESSAGE_LENGTH = SampleConfiguration.MESSAGE_LENGTH;
    private static final int FRAGMENT_COUNT_LIMIT = SampleConfiguration.FRAGMENT_COUNT_LIMIT;
    private static final String CHANNEL = SampleConfiguration.CHANNEL;
    private static final UnsafeBuffer OFFER_BUFFER = new UnsafeBuffer(BufferUtil.allocateDirectAligned((int)MESSAGE_LENGTH, (int)64));
    private static volatile boolean printingActive = true;

    public static void main(String[] args) throws InterruptedException {
        SystemUtil.loadPropertiesFiles((String[])args);
        RateReporter reporter = new RateReporter(TimeUnit.SECONDS.toNanos(1L), EmbeddedThroughput::printRate);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        AtomicBoolean running = new AtomicBoolean(true);
        try (MediaDriver mediaDriver = MediaDriver.launch();
             Aeron aeron = Aeron.connect((Aeron.Context)new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName()));
             Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID);
             ConcurrentPublication publication = aeron.addPublication(CHANNEL, STREAM_ID);){
            executor.execute(reporter);
            executor.execute(() -> SamplesUtil.subscriberLoop(SamplesUtil.rateReporterHandler(reporter), FRAGMENT_COUNT_LIMIT, running).accept(subscription));
            ContinueBarrier barrier = new ContinueBarrier("Execute again?");
            IdleStrategy idleStrategy = SampleConfiguration.newIdleStrategy();
            do {
                System.out.format("%nStreaming %,d messages of payload length %d bytes to %s on stream id %d%n", NUMBER_OF_MESSAGES, MESSAGE_LENGTH, CHANNEL, STREAM_ID);
                printingActive = true;
                long backPressureCount = 0L;
                for (long i = 0L; i < NUMBER_OF_MESSAGES; ++i) {
                    OFFER_BUFFER.putLong(0, i);
                    idleStrategy.reset();
                    while (publication.offer((DirectBuffer)OFFER_BUFFER, 0, MESSAGE_LENGTH, null) < 0L) {
                        ++backPressureCount;
                        idleStrategy.idle();
                    }
                }
                System.out.println("Done streaming. backPressureRatio=" + (double)backPressureCount / (double)NUMBER_OF_MESSAGES);
                if (LINGER_TIMEOUT_MS > 0L) {
                    System.out.println("Lingering for " + LINGER_TIMEOUT_MS + " milliseconds...");
                    Thread.sleep(LINGER_TIMEOUT_MS);
                }
                printingActive = false;
            } while (barrier.await());
            running.set(false);
            reporter.halt();
            executor.shutdown();
        }
    }

    private static void printRate(double messagesPerSec, double bytesPerSec, long totalFragments, long totalBytes) {
        if (printingActive) {
            System.out.format("%.04g msgs/sec, %.04g bytes/sec, totals %d messages %d MB payloads%n", messagesPerSec, bytesPerSec, totalFragments, totalBytes / 0x100000L);
        }
    }
}

