/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.policy;

import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.policy.Policy;
import org.mule.runtime.core.api.policy.PolicyStateHandler;
import org.mule.runtime.core.api.policy.PolicyStateId;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.policy.PolicyEventConverter;
import org.mule.runtime.core.internal.policy.PolicyStateIdFactory;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class SourcePolicyProcessor
implements Processor {
    private final Policy policy;
    private final PolicyStateHandler policyStateHandler;
    private final PolicyEventConverter policyEventConverter = new PolicyEventConverter();
    private final Processor nextProcessor;
    private final PolicyStateIdFactory stateIdFactory;

    public SourcePolicyProcessor(Policy policy, PolicyStateHandler policyStateHandler, Processor nextProcessor) {
        this.policy = policy;
        this.policyStateHandler = policyStateHandler;
        this.nextProcessor = nextProcessor;
        this.stateIdFactory = new PolicyStateIdFactory(policy.getPolicyId());
    }

    @Override
    public CoreEvent process(CoreEvent sourceEvent) throws MuleException {
        return MessageProcessors.processToApply(sourceEvent, this);
    }

    @Override
    public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
        return Mono.from(publisher).cast(PrivilegedEvent.class).flatMap(sourceEvent -> {
            PolicyStateId policyStateId = this.stateIdFactory.create((CoreEvent)sourceEvent);
            Processor nextOperation = this.buildSourceExecutionWithPolicyFunction(policyStateId, (PrivilegedEvent)sourceEvent);
            return Mono.just(sourceEvent).map(event -> this.policyEventConverter.createEvent((PrivilegedEvent)sourceEvent, this.noVariablesEvent((CoreEvent)sourceEvent))).cast(CoreEvent.class).transform(this.policy.getPolicyChain()).cast(PrivilegedEvent.class).map(event -> this.policyEventConverter.createEvent((PrivilegedEvent)event, (PrivilegedEvent)sourceEvent)).subscriberContext(ctx -> ctx.put("policy.nextOperation", nextOperation));
        }).cast(CoreEvent.class);
    }

    private Processor buildSourceExecutionWithPolicyFunction(final PolicyStateId policyStateId, final PrivilegedEvent sourceEvent) {
        return new Processor(){

            @Override
            public CoreEvent process(CoreEvent event) throws MuleException {
                return MessageProcessors.processToApply(event, this);
            }

            @Override
            public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
                return Flux.from(publisher).doOnNext(event -> this.saveState((PrivilegedEvent)event)).map(event -> SourcePolicyProcessor.this.policyEventConverter.createEvent((PrivilegedEvent)event, sourceEvent, SourcePolicyProcessor.this.policy.getPolicyChain().isPropagateMessageTransformations())).flatMap(e -> Mono.just(e).transform(SourcePolicyProcessor.this.nextProcessor).map(result -> SourcePolicyProcessor.this.policyEventConverter.createEvent((PrivilegedEvent)result, this.loadState())).onErrorMap(MessagingException.class, me -> new MessagingException((CoreEvent)SourcePolicyProcessor.this.policyEventConverter.createEvent((PrivilegedEvent)me.getEvent(), this.loadState()), (MessagingException)me)));
            }

            private void saveState(PrivilegedEvent event) {
                SourcePolicyProcessor.this.policyStateHandler.updateState(policyStateId, event);
            }

            private PrivilegedEvent loadState() {
                return (PrivilegedEvent)SourcePolicyProcessor.this.policyStateHandler.getLatestState(policyStateId).get();
            }
        };
    }

    private PrivilegedEvent noVariablesEvent(CoreEvent event) {
        return PrivilegedEvent.builder(event.getContext()).message(Message.of(null)).build();
    }
}

