/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.policy;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.config.MuleRuntimeFeature;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.collection.SmallMap;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.policy.OperationPolicyParametersTransformer;
import org.mule.runtime.core.api.policy.Policy;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.policy.AbstractCompositePolicy;
import org.mule.runtime.core.internal.policy.DeferredDisposable;
import org.mule.runtime.core.internal.policy.OperationExecutionFunction;
import org.mule.runtime.core.internal.policy.OperationParametersProcessor;
import org.mule.runtime.core.internal.policy.OperationPolicy;
import org.mule.runtime.core.internal.policy.OperationPolicyContext;
import org.mule.runtime.core.internal.policy.OperationPolicyProcessorFactory;
import org.mule.runtime.core.internal.policy.PolicyTraceLogger;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.internal.util.rx.FluxSinkSupplier;
import org.mule.runtime.core.internal.util.rx.RoundRobinFluxSinkSupplier;
import org.mule.runtime.core.internal.util.rx.RxUtils;
import org.mule.runtime.core.internal.util.rx.TransactionAwareFluxSinkSupplier;
import org.mule.runtime.extension.api.runtime.operation.CompletableComponentExecutor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class CompositeOperationPolicy
extends AbstractCompositePolicy<OperationPolicyParametersTransformer>
implements OperationPolicy,
Disposable,
DeferredDisposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(CompositeOperationPolicy.class);
    private final Component operation;
    private final OperationPolicyProcessorFactory operationPolicyProcessorFactory;
    private final LoadingCache<String, FluxSinkSupplier<CoreEvent>> policySinks;
    private final long shutdownTimeout;
    private final Scheduler completionCallbackScheduler;
    private final PolicyTraceLogger policyTraceLogger = new PolicyTraceLogger();
    private FeatureFlaggingService featureFlaggingService;

    public CompositeOperationPolicy(Component operation, List<Policy> parameterizedPolicies, Optional<OperationPolicyParametersTransformer> operationPolicyParametersTransformer, OperationPolicyProcessorFactory operationPolicyProcessorFactory, long shutdownTimeout, Scheduler completionCallbackScheduler, FeatureFlaggingService featureFlaggingService) {
        super(parameterizedPolicies, operationPolicyParametersTransformer);
        this.operation = operation;
        this.operationPolicyProcessorFactory = operationPolicyProcessorFactory;
        this.shutdownTimeout = shutdownTimeout;
        this.completionCallbackScheduler = completionCallbackScheduler;
        this.featureFlaggingService = featureFlaggingService;
        this.initProcessor();
        OperationWithPoliciesFluxObjectFactory factory = new OperationWithPoliciesFluxObjectFactory(this, featureFlaggingService);
        this.policySinks = Caffeine.newBuilder().removalListener((key, value, cause) -> value.dispose()).build(arg_0 -> CompositeOperationPolicy.lambda$new$1((Supplier)factory, arg_0));
    }

    @Override
    protected Publisher<CoreEvent> applyNextOperation(Publisher<CoreEvent> eventPub) {
        FluxSinkRecorder sinkRecorder = new FluxSinkRecorder();
        return Flux.from(RxUtils.propagateCompletion(Flux.from(eventPub), sinkRecorder.flux(), pub -> Flux.from((Publisher)pub).doOnNext((Consumer)new OperationDispatcher(sinkRecorder, this.getParametersTransformer(), this.operation)).map(e -> Either.empty()), sinkRecorder::complete, sinkRecorder::error, this.shutdownTimeout, this.completionCallbackScheduler, this.operation.getDslSource())).map(result -> {
            result.applyLeft(t -> {
                throw Exceptions.propagate((Throwable)t);
            });
            return (CoreEvent)result.getRight();
        }).doOnNext(response -> OperationPolicyContext.from(response).setNextOperationResponse((InternalEvent)response));
    }

    private static Map<String, Object> resolveOperationParameters(CoreEvent event, Optional<OperationPolicyParametersTransformer> parametersTransformer, OperationPolicyContext ctx) {
        OperationParametersProcessor parametersProcessor = ctx.getOperationParametersProcessor();
        Map<String, Object> operationParameters = parametersProcessor.getOperationParameters();
        return parametersTransformer.map(paramsTransformer -> {
            Map<String, Object> parametersMap = SmallMap.copy(operationParameters);
            parametersMap.putAll(paramsTransformer.fromMessageToParameters(event.getMessage()));
            return parametersMap;
        }).orElse(operationParameters);
    }

    @Override
    protected Publisher<CoreEvent> applyPolicy(Policy policy, ReactiveProcessor nextProcessor, Publisher<CoreEvent> eventPub) {
        return Flux.from(eventPub).transform((Function)this.operationPolicyProcessorFactory.createOperationPolicy(policy, nextProcessor));
    }

    @Override
    public void process(CoreEvent operationEvent, OperationExecutionFunction operationExecutionFunction, OperationParametersProcessor parametersProcessor, ComponentLocation operationLocation, CompletableComponentExecutor.ExecutorCallback callback) {
        FluxSink policySink = (FluxSink)((FluxSinkSupplier)this.policySinks.get((Object)operationLocation.getLocation())).get();
        policySink.next((Object)this.operationEventForPolicy(operationEvent, operationExecutionFunction, parametersProcessor, callback));
    }

    private CoreEvent operationEventForPolicy(CoreEvent operationEvent, OperationExecutionFunction operationExecutionFunction, OperationParametersProcessor parametersProcessor, CompletableComponentExecutor.ExecutorCallback callback) {
        OperationPolicyContext ctx = new OperationPolicyContext(parametersProcessor, operationExecutionFunction, callback);
        if (this.getParametersTransformer().isPresent()) {
            operationEvent = InternalEvent.builder(operationEvent).message(((OperationPolicyParametersTransformer)this.getParametersTransformer().get()).fromParametersToMessage(parametersProcessor.getOperationParameters())).build();
        }
        ((InternalEvent)operationEvent).setOperationPolicyContext(ctx);
        return operationEvent;
    }

    @Override
    public void dispose() {
        this.policySinks.invalidateAll();
        this.completionCallbackScheduler.stop();
    }

    @Override
    public Disposable deferredDispose() {
        LoadingCache<String, FluxSinkSupplier<CoreEvent>> policySinks = this.policySinks;
        Scheduler completionCallbackScheduler = this.completionCallbackScheduler;
        return () -> {
            policySinks.invalidateAll();
            completionCallbackScheduler.stop();
        };
    }

    private static /* synthetic */ FluxSinkSupplier lambda$new$1(Supplier factory, String componentLocation) throws Exception {
        return new TransactionAwareFluxSinkSupplier(factory, new RoundRobinFluxSinkSupplier(Runtime.getRuntime().availableProcessors(), factory));
    }

    private static final class OperationDispatcher
    implements Consumer<CoreEvent> {
        private final FluxSinkRecorder<Either<Throwable, CoreEvent>> sinkRecorder;
        private final Optional<OperationPolicyParametersTransformer> parametersTransformer;
        private final Component operation;

        public OperationDispatcher(FluxSinkRecorder<Either<Throwable, CoreEvent>> sinkRecorder, Optional<OperationPolicyParametersTransformer> parametersTransformer, Component operation) {
            this.sinkRecorder = sinkRecorder;
            this.parametersTransformer = parametersTransformer;
            this.operation = operation;
        }

        @Override
        public void accept(final CoreEvent event) {
            OperationPolicyContext ctx = OperationPolicyContext.from(event);
            OperationExecutionFunction operationExecutionFunction = ctx.getOperationExecutionFunction();
            operationExecutionFunction.execute(CompositeOperationPolicy.resolveOperationParameters(event, this.parametersTransformer, ctx), event, new CompletableComponentExecutor.ExecutorCallback(){

                @Override
                public void complete(Object value) {
                    sinkRecorder.next(Either.right(Throwable.class, (CoreEvent)value));
                }

                @Override
                public void error(Throwable e) {
                    sinkRecorder.next(Either.left(this.mapError(e, event), CoreEvent.class));
                }

                private Throwable mapError(Throwable t, CoreEvent event2) {
                    if (!((t = org.mule.runtime.core.api.rx.Exceptions.unwrap(t)) instanceof MessagingException)) {
                        t = new MessagingException(event2, t, operation);
                    }
                    return t;
                }
            });
        }
    }

    private static final class OperationWithPoliciesFluxObjectFactory
    implements Supplier<FluxSink<CoreEvent>> {
        private final Reference<CompositeOperationPolicy> compositeOperationPolicy;
        private FeatureFlaggingService featureFlaggingService;

        public OperationWithPoliciesFluxObjectFactory(CompositeOperationPolicy compositeOperationPolicy, FeatureFlaggingService featureFlaggingService) {
            this.compositeOperationPolicy = new WeakReference<CompositeOperationPolicy>(compositeOperationPolicy);
            this.featureFlaggingService = featureFlaggingService;
        }

        @Override
        public FluxSink<CoreEvent> get() {
            FluxSinkRecorder sinkRef = new FluxSinkRecorder();
            Flux policyFlux = sinkRef.flux().transform((Function)this.compositeOperationPolicy.get().getExecutionProcessor()).doOnNext(result -> OperationPolicyContext.from(result).getOperationCallerCallback().complete(result)).onErrorContinue(MessagingException.class, (t, e) -> {
                MessagingException me = (MessagingException)t;
                if (this.featureFlaggingService.isEnabled(MuleRuntimeFeature.HONOUR_ERROR_MAPPINGS_WHEN_POLICY_APPLIED_ON_OPERATION) && this.compositeOperationPolicy.get().operation.equals(me.getFailingComponent())) {
                    me.setProcessedEvent(CoreEvent.builder(me.getEvent()).error(null).build());
                }
                OperationPolicyContext.from(me.getEvent()).getOperationCallerCallback().error(me);
            });
            policyFlux.subscribe(null, e -> LOGGER.error("Exception reached subscriber for " + this.toString(), e));
            return sinkRef.getFluxSink();
        }
    }
}

