/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.context.thread.notification.ThreadLoggingExecutorServiceDecorator;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.ProactorStreamProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.WorkQueueStreamProcessingStrategyFactory;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ProactorStreamWorkQueueProcessingStrategyFactory
extends ReactorStreamProcessingStrategyFactory {
    @Override
    public ProcessingStrategy create(MuleContext muleContext, String schedulersNamePrefix) {
        return new ProactorStreamWorkQueueProcessingStrategy(this.getRingBufferSchedulerSupplier(muleContext, schedulersNamePrefix), this.getBufferSize(), this.getSubscriberCount(), this.getWaitStrategy(), this.getCpuLightSchedulerSupplier(muleContext, schedulersNamePrefix), () -> muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())), () -> muleContext.getSchedulerService().cpuIntensiveScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.CPU_INTENSIVE.name())), this.resolveParallelism(), this.getMaxConcurrency(), this.isMaxConcurrencyEagerCheck(), muleContext.getConfiguration().isThreadLoggingEnabled());
    }

    @Override
    protected int resolveParallelism() {
        if (this.getMaxConcurrency() == Integer.MAX_VALUE) {
            return Math.max(CORES / this.getSubscriberCount(), 1);
        }
        return Math.min(CORES, this.maxFactor(Float.max((float)this.getMaxConcurrency() / (float)this.getSubscriberCount(), 1.0f)));
    }

    private int maxFactor(float test) {
        if (test % 0.0f == 0.0f) {
            for (int i = CORES; i > 1; --i) {
                if (test % (float)i != 0.0f) continue;
                return i;
            }
        }
        return 1;
    }

    @Override
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return ProactorStreamWorkQueueProcessingStrategy.class;
    }

    static class ProactorStreamWorkQueueProcessingStrategy
    extends ProactorStreamProcessingStrategy {
        private final WorkQueueStreamProcessingStrategyFactory.WorkQueueStreamProcessingStrategy workQueueStreamProcessingStrategy;
        private final boolean isThreadLoggingEnabled;

        public ProactorStreamWorkQueueProcessingStrategy(Supplier<org.mule.runtime.api.scheduler.Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscriberCount, String waitStrategy, Supplier<org.mule.runtime.api.scheduler.Scheduler> cpuLightSchedulerSupplier, Supplier<org.mule.runtime.api.scheduler.Scheduler> blockingSchedulerSupplier, Supplier<org.mule.runtime.api.scheduler.Scheduler> cpuIntensiveSchedulerSupplier, int parallelism, int maxConcurrency, boolean maxConcurrencyEagerCheck, boolean isThreadLoggingEnabled) {
            super(subscriberCount, cpuLightSchedulerSupplier, blockingSchedulerSupplier, cpuIntensiveSchedulerSupplier, parallelism, maxConcurrency, maxConcurrencyEagerCheck);
            this.workQueueStreamProcessingStrategy = new WorkQueueStreamProcessingStrategyFactory.WorkQueueStreamProcessingStrategy(ringBufferSchedulerSupplier, bufferSize, subscriberCount, waitStrategy, blockingSchedulerSupplier, maxConcurrency, maxConcurrencyEagerCheck, isThreadLoggingEnabled){

                @Override
                protected <E> AbstractProcessingStrategy.ReactorSink<E> buildSink(FluxSink<E> fluxSink, Disposable disposable, Consumer<CoreEvent> onEventConsumer, int bufferSize) {
                    return new ProactorStreamProcessingStrategy.ProactorSinkWrapper<E>(this, super.buildSink(fluxSink, disposable, onEventConsumer, bufferSize));
                }
            };
            this.isThreadLoggingEnabled = isThreadLoggingEnabled;
        }

        public ProactorStreamWorkQueueProcessingStrategy(Supplier<org.mule.runtime.api.scheduler.Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscriberCount, String waitStrategy, Supplier<org.mule.runtime.api.scheduler.Scheduler> cpuLightSchedulerSupplier, Supplier<org.mule.runtime.api.scheduler.Scheduler> blockingSchedulerSupplier, Supplier<org.mule.runtime.api.scheduler.Scheduler> cpuIntensiveSchedulerSupplier, int parallelism, int maxConcurrency, boolean maxConcurrencyEagerCheck) {
            this(ringBufferSchedulerSupplier, bufferSize, subscriberCount, waitStrategy, cpuLightSchedulerSupplier, blockingSchedulerSupplier, cpuIntensiveSchedulerSupplier, parallelism, maxConcurrency, maxConcurrencyEagerCheck, false);
        }

        @Override
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor function) {
            return this.workQueueStreamProcessingStrategy.createSink(flowConstruct, function);
        }

        @Override
        public ReactiveProcessor onPipeline(ReactiveProcessor pipeline) {
            Scheduler scheduler = Schedulers.fromExecutorService((ExecutorService)this.decorateScheduler(this.getCpuLightScheduler()));
            if (this.maxConcurrency > this.subscribers) {
                return publisher -> Flux.from((Publisher)publisher).parallel(this.getParallelism()).runOn(scheduler).composeGroup((Function)super.onPipeline(pipeline));
            }
            return super.onPipeline(pipeline);
        }

        @Override
        protected Flux<CoreEvent> scheduleProcessor(ReactiveProcessor processor, org.mule.runtime.api.scheduler.Scheduler processorScheduler, CoreEvent event) {
            Scheduler eventLoopScheduler = Schedulers.fromExecutorService((ExecutorService)this.decorateScheduler(this.getCpuLightScheduler()));
            return this.scheduleWithLogging(processor, eventLoopScheduler, processorScheduler, event);
        }

        private Flux<CoreEvent> scheduleWithLogging(ReactiveProcessor processor, Scheduler eventLoopScheduler, org.mule.runtime.api.scheduler.Scheduler processorScheduler, CoreEvent event) {
            if (this.isThreadLoggingEnabled) {
                return Flux.just((Object)event).flatMap(e -> Mono.subscriberContext().flatMap(ctx -> Mono.just((Object)e).transform((Function)processor).publishOn(eventLoopScheduler).subscribeOn(Schedulers.fromExecutorService((ExecutorService)new ThreadLoggingExecutorServiceDecorator(ctx.getOrEmpty((Object)"mule.nb.ThreadNotificationLogger"), this.decorateScheduler(processorScheduler), e.getContext().getId())))));
            }
            return Flux.just((Object)event).transform((Function)processor).publishOn(eventLoopScheduler).subscribeOn(Schedulers.fromExecutorService((ExecutorService)this.decorateScheduler(processorScheduler)));
        }

        @Override
        public void start() throws MuleException {
            super.start();
            this.workQueueStreamProcessingStrategy.start();
        }

        @Override
        public void stop() throws MuleException {
            this.workQueueStreamProcessingStrategy.stop();
            super.stop();
        }
    }
}

