/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.policy;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.component.execution.CompletableCallback;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.event.InternalEvent;
import org.mule.runtime.core.internal.policy.CommonSourcePolicy;
import org.mule.runtime.core.internal.policy.DeferredDisposable;
import org.mule.runtime.core.internal.policy.MessageSourceResponseParametersProcessor;
import org.mule.runtime.core.internal.policy.SourcePolicy;
import org.mule.runtime.core.internal.policy.SourcePolicyContext;
import org.mule.runtime.core.internal.policy.SourcePolicyFailureResult;
import org.mule.runtime.core.internal.policy.SourcePolicySuccessResult;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class NoSourcePolicy
implements SourcePolicy,
Disposable,
DeferredDisposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(NoSourcePolicy.class);
    private final CommonSourcePolicy commonPolicy;

    public NoSourcePolicy(ReactiveProcessor flowExecutionProcessor) {
        this.commonPolicy = new CommonSourcePolicy(new SourceFluxObjectFactory(this, flowExecutionProcessor));
    }

    @Override
    public void process(CoreEvent sourceEvent, MessageSourceResponseParametersProcessor respParamProcessor, CompletableCallback<Either<SourcePolicyFailureResult, SourcePolicySuccessResult>> callback) {
        this.commonPolicy.process(this, sourceEvent, respParamProcessor, callback);
    }

    @Override
    public void dispose() {
        this.commonPolicy.dispose();
    }

    @Override
    public Disposable deferredDispose() {
        return this.commonPolicy::dispose;
    }

    private static final class SourceFluxObjectFactory
    implements Supplier<FluxSink<CoreEvent>> {
        private final Reference<NoSourcePolicy> noSourcePolicy;
        private final ReactiveProcessor flowExecutionProcessor;

        public SourceFluxObjectFactory(NoSourcePolicy noSourcePolicy, ReactiveProcessor flowExecutionProcessor) {
            this.noSourcePolicy = new WeakReference<NoSourcePolicy>(noSourcePolicy);
            this.flowExecutionProcessor = flowExecutionProcessor;
        }

        @Override
        public FluxSink<CoreEvent> get() {
            FluxSinkRecorder sinkRef = new FluxSinkRecorder();
            Flux policyFlux = sinkRef.flux().transform((Function)this.flowExecutionProcessor).map(flowExecutionResult -> {
                SourcePolicyContext ctx = SourcePolicyContext.from(flowExecutionResult);
                MessageSourceResponseParametersProcessor parametersProcessor = ctx.getResponseParametersProcessor();
                return Either.right(SourcePolicyFailureResult.class, new SourcePolicySuccessResult((CoreEvent)flowExecutionResult, () -> parametersProcessor.getSuccessfulExecutionResponseParametersFunction().apply((CoreEvent)flowExecutionResult), parametersProcessor));
            }).doOnNext(result -> result.apply(spfr -> {
                CoreEvent event = spfr.getMessagingException().getEvent();
                SourcePolicyContext ctx = SourcePolicyContext.from(event);
                this.noSourcePolicy.get().commonPolicy.finishFlowProcessing(event, (Either<SourcePolicyFailureResult, SourcePolicySuccessResult>)result, spfr.getMessagingException(), ctx);
            }, spsr -> this.noSourcePolicy.get().commonPolicy.finishFlowProcessing(spsr.getResult(), (Either<SourcePolicyFailureResult, SourcePolicySuccessResult>)result))).onErrorContinue(MessagingException.class, (t, e) -> {
                MessagingException me = (MessagingException)t;
                InternalEvent event = (InternalEvent)me.getEvent();
                NoSourcePolicy strongReference = this.noSourcePolicy.get();
                if (strongReference != null) {
                    strongReference.commonPolicy.finishFlowProcessing(event, Either.left(new SourcePolicyFailureResult(me, () -> SourcePolicyContext.from(event).getResponseParametersProcessor().getFailedExecutionResponseParametersFunction().apply(me.getEvent()))), me, SourcePolicyContext.from(event));
                }
            });
            policyFlux.subscribe(null, e -> LOGGER.error("Exception reached subscriber for {}", (Object)this, e));
            return sinkRef.getFluxSink();
        }
    }
}

