/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.ProactorStreamProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ProactorStreamEmitterProcessingStrategyFactory
extends ReactorStreamProcessingStrategyFactory {
    @Override
    public ProcessingStrategy create(MuleContext muleContext, String schedulersNamePrefix) {
        return new ProactorStreamEmitterProcessingStrategy(this.getRingBufferSchedulerSupplier(muleContext, schedulersNamePrefix), this.getBufferSize(), this.getSubscriberCount(), this.getWaitStrategy(), this.getCpuLightSchedulerSupplier(muleContext, schedulersNamePrefix), () -> muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())), () -> muleContext.getSchedulerService().cpuIntensiveScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.CPU_INTENSIVE.name())), this.resolveParallelism(), this.getMaxConcurrency(), this.isMaxConcurrencyEagerCheck());
    }

    @Override
    protected int resolveParallelism() {
        return Integer.max(CORES, this.getMaxConcurrency());
    }

    @Override
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return ProactorStreamEmitterProcessingStrategy.class;
    }

    static class RoundRobinReactorSink<E>
    implements AbstractProcessingStrategy.ReactorSink<E> {
        private final List<AbstractProcessingStrategy.ReactorSink<E>> fluxSinks;
        private final AtomicInteger index = new AtomicInteger(0);
        private final IntUnaryOperator update;

        public RoundRobinReactorSink(List<AbstractProcessingStrategy.ReactorSink<E>> sinks) {
            this.fluxSinks = sinks;
            this.update = value -> (value + 1) % this.fluxSinks.size();
        }

        @Override
        public void dispose() {
            this.fluxSinks.stream().forEach(sink -> sink.dispose());
        }

        @Override
        public void accept(CoreEvent event) {
            this.fluxSinks.get(this.nextIndex()).accept(event);
        }

        private int nextIndex() {
            return this.index.getAndUpdate(this.update);
        }

        @Override
        public boolean emit(CoreEvent event) {
            return this.fluxSinks.get(this.nextIndex()).emit(event);
        }

        @Override
        public E intoSink(CoreEvent event) {
            return (E)event;
        }
    }

    static class ProactorStreamEmitterProcessingStrategy
    extends ProactorStreamProcessingStrategy {
        private static Logger LOGGER = LoggerFactory.getLogger(ProactorStreamEmitterProcessingStrategy.class);

        public ProactorStreamEmitterProcessingStrategy(Supplier<org.mule.runtime.api.scheduler.Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscriberCount, String waitStrategy, Supplier<org.mule.runtime.api.scheduler.Scheduler> cpuLightSchedulerSupplier, Supplier<org.mule.runtime.api.scheduler.Scheduler> blockingSchedulerSupplier, Supplier<org.mule.runtime.api.scheduler.Scheduler> cpuIntensiveSchedulerSupplier, int parallelism, int maxConcurrency, boolean maxConcurrencyEagerCheck) {
            super(ringBufferSchedulerSupplier, bufferSize, subscriberCount, waitStrategy, cpuLightSchedulerSupplier, blockingSchedulerSupplier, cpuIntensiveSchedulerSupplier, parallelism, maxConcurrency, maxConcurrencyEagerCheck);
        }

        @Override
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor function) {
            long shutdownTimeout = flowConstruct.getMuleContext().getConfiguration().getShutdownTimeout();
            int sinksCount = this.maxConcurrency < AbstractStreamProcessingStrategyFactory.CORES ? this.maxConcurrency : AbstractStreamProcessingStrategyFactory.CORES;
            int sinkBufferSize = this.bufferSize / sinksCount;
            ArrayList sinks = new ArrayList();
            for (int i = 0; i < sinksCount; ++i) {
                Latch completionLatch = new Latch();
                EmitterProcessor processor = EmitterProcessor.create(sinkBufferSize);
                processor.transform(function).subscribe(null, e -> completionLatch.release(), () -> completionLatch.release());
                if (!processor.hasDownstreams()) {
                    throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("No subscriptions active for processor."));
                }
                AbstractProcessingStrategy.DefaultReactorSink sink = new AbstractProcessingStrategy.DefaultReactorSink(processor.sink(FluxSink.OverflowStrategy.BUFFER), () -> this.awaitSubscribersCompletion(flowConstruct, shutdownTimeout, completionLatch, System.currentTimeMillis()), this.createOnEventConsumer(), sinkBufferSize);
                sinks.add(new ProactorStreamProcessingStrategy.ProactorSinkWrapper(this, sink));
            }
            return new RoundRobinReactorSink(sinks);
        }

        @Override
        public ReactiveProcessor onPipeline(ReactiveProcessor pipeline) {
            Scheduler scheduler = Schedulers.fromExecutorService(this.decorateScheduler(this.getCpuLightScheduler()));
            return publisher -> Flux.from(publisher).publishOn(scheduler).doOnSubscribe(subscription -> Thread.currentThread().setContextClassLoader(this.executionClassloader)).transform(pipeline);
        }

        @Override
        protected Flux<CoreEvent> scheduleProcessor(ReactiveProcessor processor, org.mule.runtime.api.scheduler.Scheduler processorScheduler, CoreEvent event) {
            return Flux.just(event).transform(processor).subscribeOn(Schedulers.fromExecutorService(this.decorateScheduler(processorScheduler)));
        }
    }
}

