/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.Publication;
import io.aeron.driver.MediaDriver;
import io.aeron.samples.RateReporter;
import io.aeron.samples.SampleConfiguration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;
import org.agrona.BufferUtil;
import org.agrona.CloseHelper;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.console.ContinueBarrier;

public class StreamingPublisher {
    private static final long NUMBER_OF_MESSAGES = SampleConfiguration.NUMBER_OF_MESSAGES;
    private static final long LINGER_TIMEOUT_MS = SampleConfiguration.LINGER_TIMEOUT_MS;
    private static final int STREAM_ID = SampleConfiguration.STREAM_ID;
    private static final int MESSAGE_LENGTH = SampleConfiguration.MESSAGE_LENGTH;
    private static final boolean EMBEDDED_MEDIA_DRIVER = SampleConfiguration.EMBEDDED_MEDIA_DRIVER;
    private static final boolean RANDOM_MESSAGE_LENGTH = SampleConfiguration.RANDOM_MESSAGE_LENGTH;
    private static final String CHANNEL = SampleConfiguration.CHANNEL;
    private static final UnsafeBuffer OFFER_BUFFER = new UnsafeBuffer(BufferUtil.allocateDirectAligned(MESSAGE_LENGTH, 64));
    private static final IntSupplier LENGTH_GENERATOR = StreamingPublisher.composeLengthGenerator(RANDOM_MESSAGE_LENGTH, MESSAGE_LENGTH);
    private static volatile boolean printingActive = true;

    public static void main(String[] args) throws Exception {
        if (MESSAGE_LENGTH < 8) {
            throw new IllegalArgumentException("Message length must be at least 8 bytes");
        }
        MediaDriver driver = EMBEDDED_MEDIA_DRIVER ? MediaDriver.launchEmbedded() : null;
        Aeron.Context context = new Aeron.Context();
        if (EMBEDDED_MEDIA_DRIVER) {
            context.aeronDirectoryName(driver.aeronDirectoryName());
        }
        RateReporter reporter = new RateReporter(TimeUnit.SECONDS.toNanos(1L), StreamingPublisher::printRate);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        executor.execute(reporter);
        try (Aeron aeron = Aeron.connect(context);
             ConcurrentPublication publication = aeron.addPublication(CHANNEL, STREAM_ID);){
            ContinueBarrier barrier = new ContinueBarrier("Execute again?");
            IdleStrategy idleStrategy = SampleConfiguration.newIdleStrategy();
            do {
                printingActive = true;
                System.out.format("%nStreaming %,d messages of%s size %d bytes to %s on stream id %d%n", NUMBER_OF_MESSAGES, RANDOM_MESSAGE_LENGTH ? " random" : "", MESSAGE_LENGTH, CHANNEL, STREAM_ID);
                long backPressureCount = 0L;
                for (long i = 0L; i < NUMBER_OF_MESSAGES; ++i) {
                    int length = LENGTH_GENERATOR.getAsInt();
                    OFFER_BUFFER.putLong(0, i);
                    idleStrategy.reset();
                    while (((Publication)publication).offer(OFFER_BUFFER, 0, length, null) < 0L) {
                        ++backPressureCount;
                        idleStrategy.idle();
                    }
                    reporter.onMessage(length);
                }
                System.out.println("Done streaming. Back pressure ratio " + (double)backPressureCount / (double)NUMBER_OF_MESSAGES);
                if (LINGER_TIMEOUT_MS > 0L) {
                    System.out.println("Lingering for " + LINGER_TIMEOUT_MS + " milliseconds...");
                    Thread.sleep(LINGER_TIMEOUT_MS);
                }
                printingActive = false;
            } while (barrier.await());
        }
        reporter.halt();
        executor.shutdown();
        CloseHelper.close(driver);
    }

    private static void printRate(double messagesPerSec, double bytesPerSec, long totalFragments, long totalBytes) {
        if (printingActive) {
            System.out.format("%.02g msgs/sec, %.02g bytes/sec, totals %d messages %d MB%n", messagesPerSec, bytesPerSec, totalFragments, totalBytes / 0x100000L);
        }
    }

    private static IntSupplier composeLengthGenerator(boolean random, int max) {
        if (random) {
            return () -> ThreadLocalRandom.current().nextInt(8, max);
        }
        return () -> max;
    }
}

