/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.test;

import io.netty.util.ReferenceCounted;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import java.time.Duration;
import java.util.function.BiFunction;
import org.HdrHistogram.Recorder;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class PingClient {
    private final Payload payload;
    private final Mono<RSocket> client;

    public PingClient(Mono<RSocket> client) {
        this.client = client;
        this.payload = ByteBufPayload.create((String)"hello");
    }

    public Recorder startTracker(Duration interval) {
        Recorder histogram = new Recorder(3600000000000L, 3);
        Flux.interval((Duration)interval).doOnNext(aLong -> {
            System.out.println("---- PING/ PONG HISTO ----");
            histogram.getIntervalHistogram().outputPercentileDistribution(System.out, 5, Double.valueOf(1000.0), false);
            System.out.println("---- PING/ PONG HISTO ----");
        }).subscribe();
        return histogram;
    }

    public Flux<Payload> requestResponsePingPong(int count, Recorder histogram) {
        return this.pingPong(RSocket::requestResponse, count, histogram);
    }

    public Flux<Payload> requestStreamPingPong(int count, Recorder histogram) {
        return this.pingPong(RSocket::requestStream, count, histogram);
    }

    Flux<Payload> pingPong(BiFunction<RSocket, ? super Payload, ? extends Publisher<Payload>> interaction, int count, Recorder histogram) {
        return Flux.usingWhen(this.client, rsocket -> Flux.range((int)1, (int)count).flatMap(i -> {
            long start = System.nanoTime();
            return Flux.from((Publisher)((Publisher)interaction.apply((RSocket)rsocket, (Payload)this.payload.retain()))).doOnNext(ReferenceCounted::release).doFinally(signalType -> {
                long diff = System.nanoTime() - start;
                histogram.recordValue(diff);
            });
        }, 64), rsocket -> {
            rsocket.dispose();
            return rsocket.onClose();
        }).doOnError(Throwable::printStackTrace);
    }
}

