/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.policy;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.functional.Either;
import org.mule.runtime.core.api.policy.Policy;
import org.mule.runtime.core.api.policy.SourcePolicyParametersTransformer;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.policy.AbstractCompositePolicy;
import org.mule.runtime.core.internal.policy.CommonSourcePolicy;
import org.mule.runtime.core.internal.policy.MessageSourceResponseParametersProcessor;
import org.mule.runtime.core.internal.policy.OnExecuteNextErrorConsumer;
import org.mule.runtime.core.internal.policy.PolicyEventMapper;
import org.mule.runtime.core.internal.policy.PolicyNotificationHelper;
import org.mule.runtime.core.internal.policy.SourcePolicy;
import org.mule.runtime.core.internal.policy.SourcePolicyFailureResult;
import org.mule.runtime.core.internal.policy.SourcePolicyProcessorFactory;
import org.mule.runtime.core.internal.policy.SourcePolicySuccessResult;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class CompositeSourcePolicy
extends AbstractCompositePolicy<SourcePolicyParametersTransformer>
implements SourcePolicy,
Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(CompositeSourcePolicy.class);
    public static final String POLICY_SOURCE_ORIGINAL_FAILURE_RESPONSE_PARAMETERS = "policy.source.originalFailureResponseParameters";
    public static final String POLICY_SOURCE_ORIGINAL_RESPONSE_PARAMETERS = "policy.source.originalResponseParameters";
    private final CommonSourcePolicy commonPolicy;
    private final SourcePolicyProcessorFactory sourcePolicyProcessorFactory;
    private final ReactiveProcessor flowExecutionProcessor;
    private final PolicyEventMapper policyEventMapper;

    public CompositeSourcePolicy(List<Policy> parameterizedPolicies, ReactiveProcessor flowExecutionProcessor, Optional<SourcePolicyParametersTransformer> sourcePolicyParametersTransformer, SourcePolicyProcessorFactory sourcePolicyProcessorFactory) {
        super(parameterizedPolicies, sourcePolicyParametersTransformer);
        this.flowExecutionProcessor = flowExecutionProcessor;
        this.sourcePolicyProcessorFactory = sourcePolicyProcessorFactory;
        this.commonPolicy = new CommonSourcePolicy(new SourceWithPoliciesFluxObjectFactory(), sourcePolicyParametersTransformer);
        this.policyEventMapper = new PolicyEventMapper();
    }

    @Override
    protected Publisher<CoreEvent> applyNextOperation(Publisher<CoreEvent> eventPub, Policy lastPolicy) {
        return Flux.from(eventPub).transform((Function)this.flowExecutionProcessor).map(flowExecutionResponse -> this.policyEventMapper.onFlowFinish((CoreEvent)flowExecutionResponse, this.getParametersTransformer())).onErrorContinue(MessagingException.class, (error, v) -> {
            PolicyNotificationHelper notificationHelper = new PolicyNotificationHelper(lastPolicy.getPolicyChain().getMuleContext().getNotificationManager(), lastPolicy.getPolicyId(), lastPolicy.getPolicyChain());
            new OnExecuteNextErrorConsumer(event -> this.policyEventMapper.onFlowError((CoreEvent)event, lastPolicy.getPolicyId(), this.getParametersTransformer()), notificationHelper, lastPolicy.getPolicyChain().getLocation()).accept((Throwable)error);
        });
    }

    @Override
    protected Publisher<CoreEvent> applyPolicy(Policy policy, ReactiveProcessor nextProcessor, Publisher<CoreEvent> eventPub) {
        ReactiveProcessor createSourcePolicy = this.sourcePolicyProcessorFactory.createSourcePolicy(policy, nextProcessor);
        return Flux.from(eventPub).doOnNext(s -> this.logEvent(this.getCoreEventId((CoreEvent)s), this.getPolicyName(policy), () -> this.getCoreEventAttributesAsString((CoreEvent)s), "Starting Policy ")).transform((Function)createSourcePolicy).doOnNext(responseEvent -> this.logEvent(this.getCoreEventId((CoreEvent)responseEvent), this.getPolicyName(policy), () -> this.getCoreEventAttributesAsString((CoreEvent)responseEvent), "At the end of the Policy "));
    }

    @Override
    public Publisher<Either<SourcePolicyFailureResult, SourcePolicySuccessResult>> process(CoreEvent sourceEvent, MessageSourceResponseParametersProcessor respParamProcessor) {
        return this.commonPolicy.process(sourceEvent, respParamProcessor);
    }

    private Map<String, Object> concatMaps(Map<String, Object> originalResponseParameters, Map<String, Object> policyResponseParameters) {
        if (originalResponseParameters == null) {
            return policyResponseParameters;
        }
        HashMap<String, Object> concatMap = new HashMap<String, Object>(originalResponseParameters);
        policyResponseParameters.forEach((k, v) -> concatMap.merge((String)k, v, (v1, v2) -> v2));
        return concatMap;
    }

    private void logEvent(String eventId, String policyName, Supplier<String> message, String startingMessage) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Event Id: " + eventId + ".\n" + startingMessage + policyName + "\n" + message.get());
        }
    }

    private String getCoreEventId(CoreEvent event) {
        return event.getContext().getId();
    }

    private String getCoreEventAttributesAsString(CoreEvent event) {
        if (event.getMessage() == null || event.getMessage().getAttributes() == null || event.getMessage().getAttributes().getValue() == null) {
            return "";
        }
        return event.getMessage().getAttributes().getValue().toString();
    }

    private String getPolicyName(Policy policy) {
        return policy.getPolicyId();
    }

    private void logSourcePolicySuccessfullResult(SourcePolicySuccessResult result) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Event id: " + result.getResult().getContext().getId() + "\nFinished processing. \n" + this.getCoreEventAttributesAsString(result.getResult()));
        }
    }

    private void logSourcePolicyFailureResult(SourcePolicyFailureResult result) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Event id: " + result.getMessagingException().getEvent().getContext().getId() + "\nFinished processing with failure. \nError message: " + result.getMessagingException().getMessage());
        }
    }

    @Override
    public void dispose() {
        this.commonPolicy.dispose();
    }

    private final class SourceWithPoliciesFluxObjectFactory
    implements Supplier<FluxSink<CoreEvent>> {
        private SourceWithPoliciesFluxObjectFactory() {
        }

        @Override
        public FluxSink<CoreEvent> get() {
            FluxSinkRecorder sinkRef = new FluxSinkRecorder();
            Flux policyFlux = Flux.create(sinkRef).transform((Function)CompositeSourcePolicy.this.getExecutionProcessor()).map(policiesResultEvent -> Either.right(SourcePolicyFailureResult.class, new SourcePolicySuccessResult((CoreEvent)policiesResultEvent, this.resolveSuccessResponseParameters((CoreEvent)policiesResultEvent), CompositeSourcePolicy.this.commonPolicy.getResponseParamsProcessor((CoreEvent)policiesResultEvent)))).doOnNext(result -> {
                CompositeSourcePolicy.this.logSourcePolicySuccessfullResult((SourcePolicySuccessResult)result.getRight());
                CompositeSourcePolicy.this.commonPolicy.finishFlowProcessing(((SourcePolicySuccessResult)result.getRight()).getResult(), (Either<SourcePolicyFailureResult, SourcePolicySuccessResult>)result);
            }).doOnError(e -> !(e instanceof MessagingException), e -> LOGGER.error(e.getMessage(), e)).onErrorContinue(MessagingException.class, (t, e) -> {
                MessagingException me = (MessagingException)t;
                Either<SourcePolicyFailureResult, SourcePolicySuccessResult> result = Either.left(new SourcePolicyFailureResult(me, this.resolveErrorResponseParameters(me)), SourcePolicySuccessResult.class);
                CompositeSourcePolicy.this.logSourcePolicyFailureResult(result.getLeft());
                CompositeSourcePolicy.this.commonPolicy.finishFlowProcessing(me.getEvent(), result, me);
            });
            policyFlux.subscribe();
            return sinkRef.getFluxSink();
        }

        private Supplier<Map<String, Object>> resolveSuccessResponseParameters(CoreEvent policiesResultEvent) {
            Map originalResponseParameters = (Map)((InternalEvent)policiesResultEvent).getInternalParameter(CompositeSourcePolicy.POLICY_SOURCE_ORIGINAL_RESPONSE_PARAMETERS);
            return () -> CompositeSourcePolicy.this.getParametersTransformer().map(parametersTransformer -> CompositeSourcePolicy.this.concatMaps(originalResponseParameters, parametersTransformer.fromMessageToSuccessResponseParameters(policiesResultEvent.getMessage()))).orElse(originalResponseParameters);
        }

        private Supplier<Map<String, Object>> resolveErrorResponseParameters(MessagingException e) {
            Map originalFailureResponseParameters = (Map)((InternalEvent)e.getEvent()).getInternalParameter(CompositeSourcePolicy.POLICY_SOURCE_ORIGINAL_FAILURE_RESPONSE_PARAMETERS);
            return () -> CompositeSourcePolicy.this.getParametersTransformer().map(parametersTransformer -> CompositeSourcePolicy.this.concatMaps(originalFailureResponseParameters, parametersTransformer.fromMessageToErrorResponseParameters(e.getEvent().getMessage()))).orElse(originalFailureResponseParameters);
        }
    }
}

