/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.benchmarks;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.MetricRegistry;
import io.scalecube.benchmarks.BenchmarkMetrics;
import io.scalecube.benchmarks.BenchmarkSettings;
import io.scalecube.benchmarks.BenchmarkTask;
import io.scalecube.benchmarks.BenchmarkTaskImpl;
import io.scalecube.benchmarks.metrics.BenchmarkMeter;
import io.scalecube.benchmarks.metrics.BenchmarkTimer;
import io.scalecube.benchmarks.metrics.CodahaleBenchmarkMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class BenchmarkState<S extends BenchmarkState<S>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkState.class);
    protected final BenchmarkSettings settings;
    private Scheduler scheduler;
    private List<Scheduler> schedulers;
    private MetricRegistry registry;
    private BenchmarkMetrics metrics;
    private ConsoleReporter consoleReporter;
    private CsvReporter csvReporter;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean warmUpOccurred = new AtomicBoolean(false);
    private Disposable warmUpSubscriber;

    public BenchmarkState(BenchmarkSettings settings) {
        this.settings = settings;
    }

    protected void beforeAll() throws Exception {
    }

    protected void afterAll() throws Exception {
    }

    public final void start() {
        if (!this.started.compareAndSet(false, true)) {
            throw new IllegalStateException("BenchmarkState is already started");
        }
        LOGGER.info("Benchmarks settings: " + this.settings);
        this.registry = new MetricRegistry();
        this.metrics = new CodahaleBenchmarkMetrics(this.registry, this.warmUpOccurred::get);
        if (this.settings.consoleReporterEnabled()) {
            this.consoleReporter = ConsoleReporter.forRegistry((MetricRegistry)this.registry).outputTo(System.out).convertDurationsTo(this.settings.durationUnit()).convertRatesTo(this.settings.rateUnit()).build();
        }
        this.csvReporter = CsvReporter.forRegistry((MetricRegistry)this.registry).convertDurationsTo(this.settings.durationUnit()).convertRatesTo(this.settings.rateUnit()).build(this.settings.csvReporterDirectory());
        this.scheduler = Schedulers.fromExecutor((Executor)Executors.newFixedThreadPool(this.settings.numberThreads()));
        this.schedulers = IntStream.rangeClosed(1, this.settings.numberThreads()).mapToObj(i -> Schedulers.fromExecutorService((ExecutorService)Executors.newSingleThreadScheduledExecutor())).collect(Collectors.toList());
        try {
            this.beforeAll();
        }
        catch (Exception ex) {
            throw new IllegalStateException("BenchmarkState beforeAll() failed: " + ex, ex);
        }
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (this.started.get()) {
                this.csvReporter.report();
                if (this.consoleReporter != null) {
                    this.consoleReporter.report();
                }
            }
        }));
        this.warmUpSubscriber = Mono.delay((Duration)this.settings.warmUpDuration()).doOnSuccess(avoid -> {
            this.warmUpOccurred.compareAndSet(false, true);
            if (this.settings.consoleReporterEnabled()) {
                this.consoleReporter.start(this.settings.reporterInterval().toMillis(), TimeUnit.MILLISECONDS);
            }
            this.csvReporter.start(this.settings.reporterInterval().toMillis(), TimeUnit.MILLISECONDS);
        }).subscribe();
    }

    public final void shutdown() {
        if (!this.started.compareAndSet(true, false)) {
            throw new IllegalStateException("BenchmarkState is not started");
        }
        if (this.warmUpSubscriber != null) {
            this.warmUpSubscriber.dispose();
        }
        if (this.consoleReporter != null) {
            this.consoleReporter.report();
            this.consoleReporter.stop();
        }
        if (this.csvReporter != null) {
            this.csvReporter.report();
            this.csvReporter.stop();
        }
        if (this.scheduler != null) {
            this.scheduler.dispose();
        }
        if (this.schedulers != null) {
            this.schedulers.forEach(Scheduler::dispose);
        }
        try {
            this.afterAll();
        }
        catch (Exception ex) {
            throw new IllegalStateException("BenchmarkState afterAll() failed: " + ex, ex);
        }
    }

    public Scheduler scheduler() {
        return this.scheduler;
    }

    public List<Scheduler> schedulers() {
        return this.schedulers;
    }

    public MetricRegistry registry() {
        return this.registry;
    }

    public BenchmarkTimer timer(String name) {
        return this.metrics.timer(this.settings.taskName() + "-" + name);
    }

    public BenchmarkMeter meter(String name) {
        return this.metrics.meter(this.settings.taskName() + "-" + name);
    }

    public final void runForSync(Function<S, Function<Long, Object>> func) {
        BenchmarkState self = this;
        try {
            self.start();
            Function<Long, Object> unitOfWork = func.apply(self);
            CountDownLatch latch = new CountDownLatch(1);
            Flux.fromStream(LongStream.range(0L, this.settings.numOfIterations()).boxed()).parallel().runOn(this.scheduler()).map(unitOfWork).doOnTerminate(latch::countDown).subscribe();
            latch.await(this.settings.executionTaskDuration().toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw Exceptions.propagate((Throwable)e);
        }
        finally {
            self.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void runForAsync(Function<S, Function<Long, Publisher<?>>> func) {
        BenchmarkState self = this;
        try {
            self.start();
            Function<Long, Publisher<?>> unitOfWork = func.apply(self);
            int threads = this.settings.numberThreads();
            long countPerThread = this.settings.numOfIterations() / (long)threads;
            Function<Integer, Mono> scenarioPerThread = i -> Mono.fromRunnable(() -> {
                long start = (long)i.intValue() * countPerThread;
                Flux.fromStream(LongStream.range(start, start + countPerThread).boxed()).flatMap(unitOfWork::apply, this.settings.concurrency(), Integer.MAX_VALUE).take(this.settings.executionTaskDuration()).blockLast();
            });
            Flux.range((int)0, (int)threads).flatMap(i -> ((Mono)scenarioPerThread.apply((Integer)i)).subscribeOn(this.scheduler()), Integer.MAX_VALUE, Integer.MAX_VALUE).blockLast();
        }
        finally {
            self.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final <T> void runWithRampUp(BiFunction<Long, S, Publisher<T>> setUp, Function<S, Function<T, BiFunction<Long, BenchmarkTask, Publisher<?>>>> func, BiFunction<S, T, Mono<Void>> cleanUp) {
        BenchmarkState self = this;
        try {
            self.start();
            Function unitOfWork = func.apply(self);
            Flux.interval((Duration)Duration.ZERO, (Duration)this.settings.rampUpInterval()).take(this.settings.rampUpDuration()).flatMap(rampUpIteration -> {
                Scheduler scheduler = this.selectScheduler((Long)rampUpIteration);
                return Flux.range((int)0, (int)Math.max(1, this.settings.injectorsPerRampUpInterval())).flatMap(iteration1 -> this.createSetUpFactory(setUp, (S)self, (Long)rampUpIteration).subscribeOn(scheduler).map(setUpResult -> {
                    BiFunction unitOfWork1 = (BiFunction)unitOfWork.apply(setUpResult);
                    Supplier<Mono<Void>> cleanUp1 = () -> (Mono)cleanUp.apply(self, setUpResult);
                    return new BenchmarkTaskImpl(self.settings, scheduler, unitOfWork1, cleanUp1);
                }).doOnNext(arg_0 -> ((Scheduler)scheduler).schedule(arg_0)).flatMap(BenchmarkTaskImpl::completionMono));
            }, Integer.MAX_VALUE, Integer.MAX_VALUE).blockLast();
        }
        finally {
            self.shutdown();
        }
    }

    private Scheduler selectScheduler(Long rampUpIteration) {
        int schedulerIndex = (int)((rampUpIteration & Long.MAX_VALUE) % (long)this.schedulers().size());
        return this.schedulers().get(schedulerIndex);
    }

    private <T> Flux<T> createSetUpFactory(BiFunction<Long, S, Publisher<T>> setUp, S self, Long rampUpIteration) {
        return Flux.create(sink -> {
            Flux deferSetUp = Flux.defer(() -> (Publisher)setUp.apply(rampUpIteration, self));
            deferSetUp.subscribe(arg_0 -> ((FluxSink)sink).next(arg_0), ex -> {
                LOGGER.error("Exception occured on setUp at rampUpIteration: {}, cause: {}, task won't start", new Object[]{rampUpIteration, ex, ex});
                sink.complete();
            }, () -> ((FluxSink)sink).complete());
        });
    }
}

