/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.mule.runtime.api.artifact.ArtifactType;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.profiling.ProfilingService;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.StreamEmitterProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.enricher.ProactorProcessingStrategyEnricher;
import org.mule.runtime.core.internal.processor.strategy.enricher.ProcessingTypeBasedReactiveProcessorEnricher;
import org.mule.runtime.core.internal.profiling.InternalProfilingService;
import org.mule.runtime.core.internal.util.rx.RetrySchedulerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProactorStreamEmitterProcessingStrategyFactory
extends AbstractStreamProcessingStrategyFactory {
    @Override
    public ProcessingStrategy create(ProfilingService profilingService, FeatureFlaggingService featureFlaggingService, SchedulerService schedulerService, Supplier<SchedulerConfig> schedulerBaseConfig, String schedulersNamePrefix, String artifactId, ArtifactType artifactType, LongSupplier shutdownTimeoutSupplier) {
        Supplier<Scheduler> cpuLightSchedulerSupplier = this.getCpuLightSchedulerSupplier(schedulerService, schedulerBaseConfig, schedulersNamePrefix);
        return new ProactorStreamEmitterProcessingStrategy((InternalProfilingService)profilingService, this.getBufferSize(), this.getSubscriberCount(), cpuLightSchedulerSupplier, cpuLightSchedulerSupplier, () -> schedulerService.ioScheduler(((SchedulerConfig)schedulerBaseConfig.get()).withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())), () -> schedulerService.cpuIntensiveScheduler(((SchedulerConfig)schedulerBaseConfig.get()).withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.CPU_INTENSIVE.name())), this.resolveParallelism(), this.getMaxConcurrency(), this.isMaxConcurrencyEagerCheck(), artifactId, artifactType, shutdownTimeoutSupplier);
    }

    @Override
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return ProactorStreamEmitterProcessingStrategy.class;
    }

    static class ProactorStreamEmitterProcessingStrategy
    extends StreamEmitterProcessingStrategyFactory.StreamEmitterProcessingStrategy {
        private static final Logger LOGGER = LoggerFactory.getLogger(ProactorStreamEmitterProcessingStrategy.class);
        private final Supplier<Scheduler> blockingSchedulerSupplier;
        private final Supplier<Scheduler> cpuIntensiveSchedulerSupplier;
        private Scheduler blockingScheduler;
        private Scheduler cpuIntensiveScheduler;

        public ProactorStreamEmitterProcessingStrategy(InternalProfilingService profilingService, int bufferSize, int subscriberCount, Supplier<Scheduler> flowDispatchSchedulerSupplier, Supplier<Scheduler> cpuLightSchedulerSupplier, Supplier<Scheduler> blockingSchedulerSupplier, Supplier<Scheduler> cpuIntensiveSchedulerSupplier, int parallelism, int maxConcurrency, boolean maxConcurrencyEagerCheck, String artifactId, ArtifactType artifactType, LongSupplier shutdownTimeoutSupplier) {
            super(profilingService, bufferSize, subscriberCount, flowDispatchSchedulerSupplier, cpuLightSchedulerSupplier, parallelism, maxConcurrency, maxConcurrencyEagerCheck, artifactId, artifactType, shutdownTimeoutSupplier);
            this.blockingSchedulerSupplier = blockingSchedulerSupplier;
            this.cpuIntensiveSchedulerSupplier = cpuIntensiveSchedulerSupplier;
        }

        @Override
        public void start() throws MuleException {
            this.blockingScheduler = this.blockingSchedulerSupplier.get();
            this.cpuIntensiveScheduler = this.cpuIntensiveSchedulerSupplier.get();
            super.start();
        }

        @Override
        protected int getSinksCount() {
            return Math.min(this.maxConcurrency, AbstractStreamProcessingStrategyFactory.CORES);
        }

        @Override
        protected Scheduler createCpuLightScheduler(Supplier<Scheduler> cpuLightSchedulerSupplier) {
            return new RetrySchedulerWrapper(super.createCpuLightScheduler(cpuLightSchedulerSupplier), 2L);
        }

        @Override
        protected boolean stopSchedulersIfNeeded() {
            if (super.stopSchedulersIfNeeded()) {
                this.stopScheduler(this.blockingScheduler);
                this.stopScheduler(this.cpuIntensiveScheduler);
                this.blockingScheduler = null;
                this.cpuIntensiveScheduler = null;
                return true;
            }
            return false;
        }

        private void stopScheduler(Scheduler scheduler) {
            if (scheduler != null) {
                scheduler.stop();
            }
        }

        @Override
        protected ProcessingTypeBasedReactiveProcessorEnricher getProcessingStrategyEnricher() {
            ProactorProcessingStrategyEnricher blockingEnricher = this.getEnricher(this.blockingScheduler);
            return super.getProcessingStrategyEnricher().register(ReactiveProcessor.ProcessingType.BLOCKING, blockingEnricher).register(ReactiveProcessor.ProcessingType.IO_RW, blockingEnricher).register(ReactiveProcessor.ProcessingType.CPU_INTENSIVE, this.getEnricher(this.cpuIntensiveScheduler));
        }

        private ProactorProcessingStrategyEnricher getEnricher(Scheduler blockingScheduler) {
            return new ProactorProcessingStrategyEnricher(() -> blockingScheduler, this.getSchedulerDecorator().compose(this::getRetryScheduler), this.getProfilingService(), this.artifactId, this.artifactType.getArtifactTypeAsString(), this.maxConcurrency, this.getParallelism(), this.subscribers, this.getBufferQueueSize());
        }

        @Override
        protected Scheduler getFlowDispatcherScheduler() {
            return this.getCpuLightScheduler();
        }
    }
}

