/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.processor.strategy.BlockingProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.ProactorStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.ReactorProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.TransactionAwareProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.TransactionAwareWorkQueueProcessingStrategyFactory;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;

class TransactionAwareProactorStreamProcessingStrategyFactory
extends ReactorStreamProcessingStrategyFactory
implements TransactionAwareProcessingStrategyFactory {
    private static final ReactorProcessingStrategyFactory NOT_CONCURRENT_TX_AWARE_PS_FACTORY = new ReactorProcessingStrategyFactory();

    TransactionAwareProactorStreamProcessingStrategyFactory() {
    }

    @Override
    public ProcessingStrategy create(MuleContext muleContext, String schedulersNamePrefix) {
        if (this.getMaxConcurrency() == 1) {
            return NOT_CONCURRENT_TX_AWARE_PS_FACTORY.create(muleContext, schedulersNamePrefix);
        }
        return new TransactionAwareProactorStreamProcessingStrategy(() -> muleContext.getSchedulerService().customScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + RING_BUFFER_SCHEDULER_NAME_SUFFIX).withMaxConcurrentTasks(this.getSubscriberCount() + 1)), this.getBufferSize(), this.getSubscriberCount(), this.getWaitStrategy(), () -> muleContext.getSchedulerService().cpuLightScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.CPU_LITE.name())), () -> muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())), () -> muleContext.getSchedulerService().cpuIntensiveScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.CPU_INTENSIVE.name())), this.getMaxConcurrency());
    }

    @Override
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        if (this.getMaxConcurrency() == 1) {
            return NOT_CONCURRENT_TX_AWARE_PS_FACTORY.getProcessingStrategyType();
        }
        return TransactionAwareProactorStreamProcessingStrategy.class;
    }

    static class TransactionAwareProactorStreamProcessingStrategy
    extends ProactorStreamProcessingStrategyFactory.ProactorStreamProcessingStrategy {
        TransactionAwareProactorStreamProcessingStrategy(Supplier<Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscriberCount, String waitStrategy, Supplier<Scheduler> cpuLightSchedulerSupplier, Supplier<Scheduler> blockingSchedulerSupplier, Supplier<Scheduler> cpuIntensiveSchedulerSupplier, int maxConcurrency) {
            super(ringBufferSchedulerSupplier, bufferSize, subscriberCount, waitStrategy, cpuLightSchedulerSupplier, blockingSchedulerSupplier, cpuIntensiveSchedulerSupplier, maxConcurrency);
        }

        @Override
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor pipeline) {
            Sink proactorSink = super.createSink(flowConstruct, pipeline);
            Sink syncSink = BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.createSink(flowConstruct, pipeline);
            return new TransactionAwareWorkQueueProcessingStrategyFactory.TransactionAwareWorkQueueProcessingStrategy.DelegateSink(syncSink, proactorSink);
        }

        @Override
        protected Consumer<Event> createOnEventConsumer() {
            return event -> {};
        }

        @Override
        protected ExecutorService decorateScheduler(Scheduler scheduler) {
            return new ConditionalExecutorServiceDecorator(scheduler, cuurentScheduler -> TransactionCoordination.isTransactionActive());
        }
    }
}

