/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.processor.strategy.BlockingProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyAdapter;
import org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator;
import org.mule.runtime.core.internal.processor.strategy.StreamPerThreadSink;
import org.mule.runtime.core.internal.processor.strategy.TransactionalDelegateSink;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public class TransactionAwareStreamEmitterProcessingStrategyDecorator
extends ProcessingStrategyDecorator {
    private static final String TX_SCOPES_KEY = "mule.tx.activeTransactionsInReactorChain";
    private static final Consumer<CoreEvent> NULL_EVENT_CONSUMER = event -> {};

    public TransactionAwareStreamEmitterProcessingStrategyDecorator(ProcessingStrategy delegate) {
        super(delegate);
        if (delegate instanceof ProcessingStrategyAdapter) {
            ProcessingStrategyAdapter adapter = (ProcessingStrategyAdapter)delegate;
            adapter.setOnEventConsumer(NULL_EVENT_CONSUMER);
            Function<ScheduledExecutorService, ScheduledExecutorService> delegateDecorator = adapter.getSchedulerDecorator();
            adapter.setSchedulerDecorator(scheduler -> new ConditionalExecutorServiceDecorator((ScheduledExecutorService)delegateDecorator.apply((ScheduledExecutorService)scheduler), currentScheduler -> TransactionCoordination.isTransactionActive()));
        }
    }

    @Override
    public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor pipeline) {
        Sink delegateSink = this.delegate.createSink(flowConstruct, pipeline);
        StreamPerThreadSink syncSink = new StreamPerThreadSink(p -> Flux.from((Publisher)p).subscriberContext(TransactionAwareStreamEmitterProcessingStrategyDecorator.popTxFromSubscriberContext()).transform((Function)pipeline).subscriberContext(TransactionAwareStreamEmitterProcessingStrategyDecorator.pushTxToSubscriberContext("source")), NULL_EVENT_CONSUMER, flowConstruct);
        return new TransactionalDelegateSink(syncSink, delegateSink);
    }

    @Override
    public ReactiveProcessor onPipeline(ReactiveProcessor pipeline) {
        return pub -> Mono.subscriberContext().flatMapMany(ctx -> {
            if (this.isTxActive((Context)ctx).booleanValue()) {
                return Flux.from((Publisher)pub).transform((Function)BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onPipeline(pipeline));
            }
            return Flux.from((Publisher)pub).transform((Function)this.delegate.onPipeline(pipeline));
        });
    }

    @Override
    public ReactiveProcessor onProcessor(ReactiveProcessor processor) {
        return pub -> Mono.subscriberContext().flatMapMany(ctx -> {
            if (this.isTxActive((Context)ctx).booleanValue()) {
                return Flux.from((Publisher)pub).transform((Function)BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onProcessor(processor));
            }
            return Flux.from((Publisher)pub).transform((Function)this.delegate.onProcessor(processor));
        });
    }

    private Boolean isTxActive(Context ctx) {
        return ctx.getOrEmpty((Object)TX_SCOPES_KEY).map(txScopes -> txScopes.size() > 0).orElse(false);
    }

    public static Function<Context, Context> popTxFromSubscriberContext() {
        return context -> {
            ArrayDeque currentTxChains = new ArrayDeque((Collection)context.getOrDefault((Object)TX_SCOPES_KEY, Collections.emptyList()));
            currentTxChains.pop();
            return context.put((Object)TX_SCOPES_KEY, currentTxChains);
        };
    }

    public static Function<Context, Context> pushTxToSubscriberContext(String location) {
        return context -> {
            ArrayDeque<String> currentTxChains = new ArrayDeque<String>((Collection)context.getOrDefault((Object)TX_SCOPES_KEY, Collections.emptyList()));
            currentTxChains.push(location);
            return context.put((Object)TX_SCOPES_KEY, currentTxChains);
        };
    }
}

