/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToByteEncoder;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Recorder;
import org.agrona.console.ContinueBarrier;
import org.reactivestreams.Publisher;
import reactor.aeron.Configurations;
import reactor.aeron.LatencyReporter;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;

public class ReactorNettyClientPing {
    private static final Recorder HISTOGRAM = new Recorder(TimeUnit.SECONDS.toNanos(10L), 3);
    private static final LatencyReporter latencyReporter = new LatencyReporter(HISTOGRAM);
    private static final ByteBuf PAYLOAD = ByteBufAllocator.DEFAULT.buffer(Configurations.MESSAGE_LENGTH);

    public static void main(String[] args) {
        System.out.println("message size: " + Configurations.MESSAGE_LENGTH + ", number of messages: " + Configurations.NUMBER_OF_MESSAGES + ", address: " + Configurations.MDC_ADDRESS + ", port: " + Configurations.MDC_PORT);
        LoopResources loopResources = LoopResources.create((String)"reactor-netty");
        Connection connection = TcpClient.create((ConnectionProvider)ConnectionProvider.newConnection()).runOn(loopResources).host(Configurations.MDC_ADDRESS).port(Configurations.MDC_PORT).option(ChannelOption.TCP_NODELAY, (Object)true).option(ChannelOption.SO_KEEPALIVE, (Object)true).option(ChannelOption.SO_REUSEADDR, (Object)true).doOnConnected(System.out::println).bootstrap(b -> BootstrapHandlers.updateConfiguration((Bootstrap)b, (String)"channel", (connectionObserver, channel) -> ReactorNettyClientPing.setupChannel(channel))).connectNow();
        ContinueBarrier barrier = new ContinueBarrier("Execute again?");
        do {
            System.out.println("Pinging " + Configurations.NUMBER_OF_MESSAGES + " messages");
            ReactorNettyClientPing.roundTripMessages(connection, Configurations.NUMBER_OF_MESSAGES);
            System.out.println("Histogram of RTT latencies in microseconds.");
        } while (barrier.await());
        connection.dispose();
        connection.onDispose((Disposable)loopResources).onDispose().block();
    }

    private static void roundTripMessages(Connection connection, long count) {
        HISTOGRAM.reset();
        Disposable disp = latencyReporter.start();
        connection.outbound().sendObject((Publisher)Flux.range((int)0, (int)Configurations.REQUESTED)).then().subscribe();
        connection.outbound().sendObject((Publisher)connection.inbound().receive().retain().take(count).doOnNext(buffer -> {
            long start = buffer.readLong();
            buffer.readerIndex(Configurations.MESSAGE_LENGTH);
            long diff = System.nanoTime() - start;
            HISTOGRAM.recordValue(diff);
            buffer.release();
        }).map(buffer -> 1)).then((Publisher)Mono.defer(() -> Mono.delay((Duration)Duration.ofMillis(100L)).doOnSubscribe(s -> disp.dispose()).then())).then().block();
    }

    private static void setupChannel(Channel channel) {
        int maxFrameLength = 0x100000;
        int lengthFieldLength = 2;
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new ChannelHandler[]{new LengthFieldPrepender(2)});
        pipeline.addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(0x100000, 0, 2, 0, 2)});
        pipeline.addLast(new ChannelHandler[]{new MessageToByteEncoder<Integer>(){

            protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) {
                out.writeLong(System.nanoTime());
                out.writeBytes(PAYLOAD.slice());
            }
        }});
    }

    static {
        Random random = new Random(System.nanoTime());
        byte[] bytes = new byte[Configurations.MESSAGE_LENGTH];
        random.nextBytes(bytes);
        PAYLOAD.writeBytes(bytes);
    }
}

