/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.function.Consumer;
import java.util.function.Supplier;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.ProactorStreamProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ProactorStreamWorkQueueProcessingStrategyFactory
extends ReactorStreamProcessingStrategyFactory {
    @Override
    public ProcessingStrategy create(MuleContext muleContext, String schedulersNamePrefix) {
        return new ProactorStreamWorkQueueProcessingStrategy(this.getRingBufferSchedulerSupplier(muleContext, schedulersNamePrefix), this.getBufferSize(), this.getSubscriberCount(), this.getWaitStrategy(), this.getCpuLightSchedulerSupplier(muleContext, schedulersNamePrefix), () -> muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())), () -> muleContext.getSchedulerService().cpuIntensiveScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.CPU_INTENSIVE.name())), this.resolveParallelism(), this.getMaxConcurrency(), this.isMaxConcurrencyEagerCheck());
    }

    @Override
    protected int resolveParallelism() {
        if (this.getMaxConcurrency() == Integer.MAX_VALUE) {
            return Math.max(CORES / this.getSubscriberCount(), 1);
        }
        return Math.min(CORES, this.maxFactor(Float.max((float)this.getMaxConcurrency() / (float)this.getSubscriberCount(), 1.0f)));
    }

    private int maxFactor(float test) {
        if (test % 0.0f == 0.0f) {
            for (int i = CORES; i > 1; --i) {
                if (test % (float)i != 0.0f) continue;
                return i;
            }
        }
        return 1;
    }

    @Override
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return ProactorStreamWorkQueueProcessingStrategy.class;
    }

    static class ProactorStreamWorkQueueProcessingStrategy
    extends ProactorStreamProcessingStrategy {
        public ProactorStreamWorkQueueProcessingStrategy(Supplier<org.mule.runtime.api.scheduler.Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscriberCount, String waitStrategy, Supplier<org.mule.runtime.api.scheduler.Scheduler> cpuLightSchedulerSupplier, Supplier<org.mule.runtime.api.scheduler.Scheduler> blockingSchedulerSupplier, Supplier<org.mule.runtime.api.scheduler.Scheduler> cpuIntensiveSchedulerSupplier, int parallelism, int maxConcurrency, boolean maxConcurrencyEagerCheck) {
            super(ringBufferSchedulerSupplier, bufferSize, subscriberCount, waitStrategy, cpuLightSchedulerSupplier, blockingSchedulerSupplier, cpuIntensiveSchedulerSupplier, parallelism, maxConcurrency, maxConcurrencyEagerCheck);
        }

        @Override
        protected Flux<CoreEvent> scheduleProcessor(ReactiveProcessor processor, org.mule.runtime.api.scheduler.Scheduler processorScheduler, CoreEvent event) {
            Scheduler eventLoopScheduler = Schedulers.fromExecutorService(this.decorateScheduler(this.getCpuLightScheduler()));
            return Flux.just(event).transform(processor).publishOn(eventLoopScheduler).subscribeOn(Schedulers.fromExecutorService(this.decorateScheduler(processorScheduler)));
        }

        @Override
        protected <E> AbstractProcessingStrategy.ReactorSink<E> buildSink(FluxSink<E> fluxSink, Disposable disposable, Consumer<CoreEvent> onEventConsumer, int bufferSize) {
            return new ProactorStreamProcessingStrategy.ProactorSinkWrapper<E>(super.buildSink(fluxSink, disposable, onEventConsumer, bufferSize));
        }
    }
}

