/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.processor.strategy.BlockingProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.TransactionAwareProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.TransactionalDelegateSink;
import org.mule.runtime.core.internal.processor.strategy.WorkQueueProcessingStrategyFactory;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;

public class TransactionAwareWorkQueueProcessingStrategyFactory
extends WorkQueueProcessingStrategyFactory
implements TransactionAwareProcessingStrategyFactory {
    @Override
    public ProcessingStrategy create(MuleContext muleContext, String schedulersNamePrefix) {
        SchedulerConfig schedulerConfig = muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.BLOCKING.name());
        if (this.getMaxConcurrency() != Integer.MAX_VALUE) {
            schedulerConfig = schedulerConfig.withMaxConcurrentTasks(this.getMaxConcurrency());
        }
        SchedulerConfig finalSchedulerConfig = schedulerConfig;
        return new TransactionAwareWorkQueueProcessingStrategy(() -> muleContext.getSchedulerService().ioScheduler(finalSchedulerConfig));
    }

    @Override
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return TransactionAwareWorkQueueProcessingStrategy.class;
    }

    static class TransactionAwareWorkQueueProcessingStrategy
    extends WorkQueueProcessingStrategyFactory.WorkQueueProcessingStrategy {
        protected TransactionAwareWorkQueueProcessingStrategy(Supplier<Scheduler> ioSchedulerSupplier) {
            super(ioSchedulerSupplier);
        }

        @Override
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor pipeline) {
            Sink workQueueSink = super.createSink(flowConstruct, pipeline);
            Sink syncSink = BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.createSink(flowConstruct, pipeline);
            return new TransactionalDelegateSink(syncSink, workQueueSink);
        }

        @Override
        protected Consumer<CoreEvent> createOnEventConsumer() {
            return event -> {};
        }

        @Override
        protected ScheduledExecutorService decorateScheduler(ScheduledExecutorService scheduler) {
            return new ConditionalExecutorServiceDecorator(super.decorateScheduler(scheduler), currentScheduler -> TransactionCoordination.isTransactionActive());
        }

        @Override
        public ReactiveProcessor onPipeline(ReactiveProcessor pipeline) {
            return TransactionCoordination.isTransactionActive() ? BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onPipeline(pipeline) : super.onPipeline(pipeline);
        }

        @Override
        public ReactiveProcessor onProcessor(ReactiveProcessor processor) {
            return TransactionCoordination.isTransactionActive() ? BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onProcessor(processor) : super.onProcessor(processor);
        }
    }
}

