/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.profiling.ProfilingDataProducer;
import org.mule.runtime.api.profiling.type.ProfilingEventType;
import org.mule.runtime.api.profiling.type.RuntimeProfilingEventTypes;
import org.mule.runtime.api.profiling.type.context.ComponentProcessingStrategyProfilingEventContext;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.processor.strategy.BlockingProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.DefaultCachedThreadReactorSinkProvider;
import org.mule.runtime.core.internal.processor.strategy.PreservingThreadContextExecutorServiceWrapper;
import org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyAdapter;
import org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator;
import org.mule.runtime.core.internal.processor.strategy.ReactorSinkProviderBasedSink;
import org.mule.runtime.core.internal.processor.strategy.TransactionalDelegateSink;
import org.mule.runtime.core.internal.processor.strategy.reactor.builder.ReactorPublisherBuilder;
import org.mule.runtime.core.internal.processor.strategy.util.ProfilingUtils;
import org.mule.runtime.core.internal.profiling.InternalProfilingService;
import org.mule.runtime.core.internal.profiling.context.DefaultComponentProcessingStrategyProfilingEventContext;
import org.mule.runtime.core.internal.profiling.tracing.DefaultComponentMetadata;
import org.mule.runtime.core.internal.profiling.tracing.DefaultExecutionContext;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;
import org.mule.runtime.core.internal.util.rx.ReactorTransactionUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class TransactionAwareStreamEmitterProcessingStrategyDecorator
extends ProcessingStrategyDecorator {
    private static final Consumer<CoreEvent> NULL_EVENT_CONSUMER = event -> {};
    @Inject
    private InternalProfilingService profilingService;
    @Inject
    private MuleContext muleContext;
    @Inject
    FeatureFlaggingService featureFlaggingService;

    public TransactionAwareStreamEmitterProcessingStrategyDecorator(ProcessingStrategy delegate) {
        super(delegate);
        if (delegate instanceof ProcessingStrategyAdapter) {
            ProcessingStrategyAdapter adapter = (ProcessingStrategyAdapter)delegate;
            adapter.setOnEventConsumer(NULL_EVENT_CONSUMER);
            UnaryOperator<ScheduledExecutorService> delegateDecorator = adapter.getSchedulerDecorator();
            UnaryOperator loggingDecorator = scheduler -> new PreservingThreadContextExecutorServiceWrapper((ScheduledExecutorService)delegateDecorator.apply((ScheduledExecutorService)scheduler));
            adapter.setSchedulerDecorator(scheduler -> new ConditionalExecutorServiceDecorator((ScheduledExecutorService)loggingDecorator.apply(scheduler), currentScheduler -> TransactionCoordination.isTransactionActive()));
        }
    }

    @Override
    public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor pipeline) {
        Sink delegateSink = this.delegate.createSink(flowConstruct, pipeline);
        ReactorSinkProviderBasedSink syncSink = new ReactorSinkProviderBasedSink(new DefaultCachedThreadReactorSinkProvider(flowConstruct, p -> Flux.from((Publisher)p).subscriberContext(ReactorTransactionUtils.popTxFromSubscriberContext()).transform((Function)pipeline).subscriberContext(ReactorTransactionUtils.pushTxToSubscriberContext("source")), NULL_EVENT_CONSUMER, this.featureFlaggingService));
        return new TransactionalDelegateSink(syncSink, delegateSink);
    }

    @Override
    public ReactiveProcessor onPipeline(ReactiveProcessor pipeline) {
        ComponentLocation location = ProfilingUtils.getLocation(pipeline);
        String artifactId = ProfilingUtils.getArtifactId(this.muleContext);
        String artifactType = ProfilingUtils.getArtifactType(this.muleContext);
        Function<CoreEvent, ComponentProcessingStrategyProfilingEventContext> transformer = coreEvent -> new DefaultComponentProcessingStrategyProfilingEventContext((CoreEvent)coreEvent, location, Thread.currentThread().getName(), artifactId, artifactType, System.currentTimeMillis());
        return pub -> Flux.deferContextual(ctx -> {
            if (ReactorTransactionUtils.isTxActiveByContext(ctx)) {
                return ReactorPublisherBuilder.buildFlux((Publisher<CoreEvent>)pub).setTracingContext(this.profilingService, coreEvent -> new DefaultExecutionContext(new DefaultComponentMetadata(coreEvent.getCorrelationId(), artifactId, artifactType, location))).profileProcessingStrategyEvent(this.profilingService, this.getDataProducer(RuntimeProfilingEventTypes.PS_SCHEDULING_FLOW_EXECUTION), transformer).profileProcessingStrategyEvent(this.profilingService, this.getDataProducer(RuntimeProfilingEventTypes.STARTING_FLOW_EXECUTION), transformer).transform(BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onPipeline(pipeline)).profileProcessingStrategyEvent(this.profilingService, this.getDataProducer(RuntimeProfilingEventTypes.FLOW_EXECUTED), transformer).build();
            }
            return Flux.from((Publisher)pub).transform((Function)this.delegate.onPipeline(pipeline));
        });
    }

    private ProfilingDataProducer<ComponentProcessingStrategyProfilingEventContext, CoreEvent> getDataProducer(ProfilingEventType<ComponentProcessingStrategyProfilingEventContext> eventType) {
        return this.profilingService.getProfilingDataProducer(eventType);
    }

    @Override
    public ReactiveProcessor onProcessor(ReactiveProcessor processor) {
        ComponentLocation location = ProfilingUtils.getLocation(processor);
        String artifactId = this.muleContext.getConfiguration().getId();
        String artifactType = this.muleContext.getArtifactType().getAsString();
        Function<CoreEvent, ComponentProcessingStrategyProfilingEventContext> transformer = coreEvent -> new DefaultComponentProcessingStrategyProfilingEventContext((CoreEvent)coreEvent, location, Thread.currentThread().getName(), artifactId, artifactType, System.currentTimeMillis());
        return pub -> Flux.deferContextual(ctx -> {
            if (ReactorTransactionUtils.isTxActiveByContext(ctx)) {
                return ReactorPublisherBuilder.buildFlux((Publisher<CoreEvent>)pub).setTracingContext(this.profilingService, coreEvent -> new DefaultExecutionContext(new DefaultComponentMetadata(coreEvent.getCorrelationId(), artifactId, artifactType, location))).profileProcessingStrategyEvent(this.profilingService, this.getDataProducer(RuntimeProfilingEventTypes.PS_SCHEDULING_OPERATION_EXECUTION), transformer).profileProcessingStrategyEvent(this.profilingService, this.getDataProducer(RuntimeProfilingEventTypes.PS_STARTING_OPERATION_EXECUTION), transformer).transform(BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onProcessor(processor)).profileProcessingStrategyEvent(this.profilingService, this.getDataProducer(RuntimeProfilingEventTypes.PS_OPERATION_EXECUTED), transformer).profileProcessingStrategyEvent(this.profilingService, this.getDataProducer(RuntimeProfilingEventTypes.PS_FLOW_MESSAGE_PASSING), transformer).build();
            }
            return Flux.from((Publisher)pub).transform((Function)this.delegate.onProcessor(processor));
        });
    }
}

