/*
 * Decompiled with CFR 0.152.
 */
package org.mule.test.construct;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Stories;
import io.qameta.allure.Story;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.logging.log4j.core.util.Throwables;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.mule.functional.api.exception.ExpectedError;
import org.mule.functional.api.flow.FlowRunner;
import org.mule.functional.junit4.matchers.ThrowableMessageMatcher;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.api.notification.MessageProcessorNotificationListener;
import org.mule.runtime.api.notification.NotificationListener;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.error.Errors;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.HttpService;
import org.mule.runtime.http.api.client.HttpRequestOptions;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.service.http.TestHttpClient;
import org.mule.tck.junit4.matcher.ErrorTypeMatcher;
import org.mule.tck.junit4.rule.DynamicPort;
import org.mule.tck.probe.PollingProber;
import org.mule.test.AbstractIntegrationTestCase;

@Feature(value="Core Components")
@Story(value="Flow Reference")
public class FlowRefTestCase
extends AbstractIntegrationTestCase {
    private static final String CONTEXT_DEPTH_MESSAGE = "Too many nested child contexts.";
    @Rule
    public ExpectedError expectedException = ExpectedError.none();
    @Rule
    public DynamicPort port = new DynamicPort("port");
    private List<Future<HttpResponse>> sendAsyncs = new ArrayList<Future<HttpResponse>>();
    @Rule
    public TestHttpClient httpClient = new TestHttpClient.Builder((HttpService)this.getService(HttpService.class)).build();
    private Scheduler asyncFlowRunnerScheduler;
    @Inject
    private Flow referencedFlowWithMaxConcurrency;
    private static CountDownLatch latch;
    private static AtomicInteger callbackInFlight;
    private static AtomicInteger awaiting;

    protected String getConfigFile() {
        return "org/mule/test/construct/flow-ref.xml";
    }

    @Before
    public void before() {
        this.sendAsyncs = new ArrayList<Future<HttpResponse>>();
        latch = new CountDownLatch(1);
        awaiting.set(0);
        this.asyncFlowRunnerScheduler = muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withShutdownTimeout(0L, TimeUnit.SECONDS));
    }

    @After
    public void after() throws Exception {
        this.asyncFlowRunnerScheduler.shutdownNow();
        latch.countDown();
        for (Future<HttpResponse> sentAsync : this.sendAsyncs) {
            sentAsync.get(5000L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void twoFlowRefsToSubFlow() throws Exception {
        CoreEvent muleEvent = ((FlowRunner)this.flowRunner("flow1").withPayload((Object)"0")).run();
        Assert.assertThat((Object)this.getPayloadAsString(muleEvent.getMessage()), (Matcher)Matchers.is((Object)"012xyzabc312xyzabc3"));
    }

    @Test
    public void dynamicFlowRef() throws Exception {
        Assert.assertThat((Object)((FlowRunner)((FlowRunner)this.flowRunner("flow2").withPayload((Object)"0")).withVariable("letter", (Object)"A")).run().getMessage().getPayload().getValue(), (Matcher)Matchers.is((Object)"0A"));
        Assert.assertThat((Object)((FlowRunner)((FlowRunner)this.flowRunner("flow2").withPayload((Object)"0")).withVariable("letter", (Object)"B")).run().getMessage().getPayload().getValue(), (Matcher)Matchers.is((Object)"0B"));
    }

    @Test
    public void dynamicFlowRefTextPlain() throws Exception {
        Assert.assertThat((Object)((FlowRunner)((FlowRunner)this.flowRunner("flow3").withPayload((Object)"0")).withVariable("letter", (Object)" A ", DataType.TEXT_STRING)).run().getMessage().getPayload().getValue(), (Matcher)Matchers.is((Object)"0A"));
        Assert.assertThat((Object)((FlowRunner)((FlowRunner)this.flowRunner("flow3").withPayload((Object)"0")).withVariable("letter", (Object)" B ", DataType.TEXT_STRING)).run().getMessage().getPayload().getValue(), (Matcher)Matchers.is((Object)"0B"));
    }

    @Test
    public void dynamicFlowRefWithChoice() throws Exception {
        Assert.assertThat((Object)((FlowRunner)((FlowRunner)this.flowRunner("flow2").withPayload((Object)"0")).withVariable("letter", (Object)"C")).run().getMessage().getPayload().getValue(), (Matcher)Matchers.is((Object)"0A"));
    }

    @Test
    public void flowRefTargetToFlow() throws Exception {
        Assert.assertThat((Object)((TypedValue)this.flowRunner("targetToFlow").run().getVariables().get("flowRefResult")).getValue(), (Matcher)Matchers.is((Object)"result"));
    }

    @Test
    public void flowRefTargetToSubFlow() throws Exception {
        Assert.assertThat((Object)((TypedValue)this.flowRunner("targetToSubFlow").run().getVariables().get("flowRefResult")).getValue(), (Matcher)Matchers.is((Object)"result"));
    }

    @Test
    public void dynamicFlowRefWithScatterGather() throws Exception {
        Map messageList = (Map)((FlowRunner)((FlowRunner)this.flowRunner("flow2").withPayload((Object)"0")).withVariable("letter", (Object)"SG")).run().getMessage().getPayload().getValue();
        List payloads = messageList.values().stream().map(msg -> msg.getPayload().getValue()).collect(Collectors.toList());
        Assert.assertEquals((Object)"0A", payloads.get(0));
        Assert.assertEquals((Object)"0B", payloads.get(1));
    }

    @Test
    public void flowRefNotFound() throws Exception {
        this.expectedException.expectMessage(CoreMatchers.containsString((String)"No flow/sub-flow with name 'sub-flow-Z' found"));
        this.expectedException.expectErrorType(Errors.CORE_NAMESPACE_NAME, "ROUTING");
        Assert.assertThat((Object)((FlowRunner)((FlowRunner)this.flowRunner("flow2").withPayload((Object)"0")).withVariable("letter", (Object)"Z")).run().getMessage().getPayload().getValue(), (Matcher)Matchers.is((Object)"0C"));
    }

    @Test
    @Issue(value="MULE-14285")
    public void flowRefFlowErrorNotifications() throws Exception {
        List<MessageProcessorNotification> notificationList = Collections.synchronizedList(new ArrayList());
        this.setupMessageProcessorNotificationListener(notificationList);
        this.flowRunner("flowRefFlowErrorNotifications").runExpectingException(ErrorTypeMatcher.errorType((String)"APP", (String)"EXPECTED"));
        this.assertNotifications(notificationList, "flowRefFlowErrorNotifications/processors/0");
    }

    @Test
    @Issue(value="MULE-14285")
    public void flowRefSubFlowErrorNotifications() throws Exception {
        List<MessageProcessorNotification> notificationList = Collections.synchronizedList(new ArrayList());
        this.setupMessageProcessorNotificationListener(notificationList);
        this.flowRunner("flowRefSubFlowErrorNotifications").runExpectingException(ErrorTypeMatcher.errorType((String)"APP", (String)"EXPECTED"));
        this.assertNotifications(notificationList, "flowRefSubFlowErrorNotifications/processors/0");
    }

    private void setupMessageProcessorNotificationListener(List<MessageProcessorNotification> notificationList) {
        muleContext.getNotificationManager().addInterfaceToType(MessageProcessorNotificationListener.class, MessageProcessorNotification.class);
        muleContext.getNotificationManager().addListener((NotificationListener)((MessageProcessorNotificationListener)notification -> notificationList.add((MessageProcessorNotification)notification)));
    }

    private void assertNotifications(List<MessageProcessorNotification> notificationList, String name) {
        PollingProber.probe(() -> {
            Assert.assertThat((String)notificationList.toString(), (Object)notificationList, (Matcher)Matchers.hasSize((int)4));
            MessageProcessorNotification preNotification = (MessageProcessorNotification)notificationList.get(0);
            Assert.assertThat((Object)preNotification.getAction().getActionId(), (Matcher)CoreMatchers.equalTo((Object)1601));
            Assert.assertThat((Object)preNotification.getComponent().getLocation().getLocation(), (Matcher)CoreMatchers.equalTo((Object)name));
            Assert.assertThat((Object)preNotification.getException(), (Matcher)Matchers.is((Matcher)CoreMatchers.nullValue()));
            MessageProcessorNotification postNotification = (MessageProcessorNotification)notificationList.get(3);
            Assert.assertThat((Object)postNotification.getAction().getActionId(), (Matcher)CoreMatchers.equalTo((Object)1602));
            Assert.assertThat((Object)postNotification.getComponent().getLocation().getLocation(), (Matcher)CoreMatchers.equalTo((Object)name));
            Assert.assertThat((Object)postNotification.getException().getCause(), (Matcher)Matchers.instanceOf(DefaultMuleException.class));
            Assert.assertThat((Object)postNotification.getEvent().getError().isPresent(), (Matcher)Matchers.is((Object)true));
            Assert.assertThat((Object)((Error)postNotification.getEvent().getError().get()).getCause(), (Matcher)Matchers.instanceOf(DefaultMuleException.class));
            return true;
        });
    }

    @Test
    public void recursive() throws Exception {
        this.flowRunner("recursiveCaller").runExpectingException(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.containsString((String)CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void recursiveDynamic() throws Exception {
        this.flowRunner("recursiveDynamicCaller").runExpectingException(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.containsString((String)CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void recursiveSubFlow() throws Exception {
        this.flowRunner("recursiveSubFlowCaller").runExpectingException(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.containsString((String)CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void crossedRecursiveSubFlow() throws Exception {
        this.flowRunner("crossedRecursiveSubflow").runExpectingException(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.containsString((String)CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void tripleCrossedRecursiveSubFlow() throws Exception {
        this.flowRunner("tripleCrossedRecursiveSubflow").runExpectingException(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.containsString((String)CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void recursiveSubFlowDynamic() throws Exception {
        this.flowRunner("recursiveSubFlowDynamicCaller").runExpectingException(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.containsString((String)CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    @Story(value="Backpressure")
    @Ignore(value="W-12947068: How to handle backpressure on flow-ref's is not defined yet, but this test will provide a starting point in the future...")
    public void backpressureFlowRef() throws Exception {
        HttpRequest request = HttpRequest.builder().uri(String.format("http://localhost:%s/backpressureFlowRef?ref=backpressureFlowRefInner", this.port.getNumber())).method(HttpConstants.Method.GET).build();
        int nThreads = Runtime.getRuntime().availableProcessors() * 4 + 1;
        for (int i = 0; i < nThreads; ++i) {
            this.sendAsyncs.add(this.httpClient.sendAsync(request, HttpRequestOptions.builder().responseTimeout(10000).build()));
        }
        PollingProber.probe((long)5000L, (long)50L, () -> awaiting.get() >= Runtime.getRuntime().availableProcessors() * 2);
        PollingProber.probe((long)5000L, (long)50L, () -> {
            Assert.assertThat((Object)this.httpClient.send(request, HttpRequestOptions.builder().responseTimeout(1000).build()).getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode()));
            return true;
        });
    }

    @Test
    @Story(value="Backpressure")
    @Ignore(value="W-12947068: How to handle backpressure on flow-ref's is not defined yet, but this test will provide a starting point in the future...")
    public void backpressureFlowRefSub() throws Exception {
        HttpRequest request = HttpRequest.builder().uri(String.format("http://localhost:%s/backpressureFlowRef?ref=backpressureFlowRefInnerSub", this.port.getNumber())).method(HttpConstants.Method.GET).build();
        int nThreads = Runtime.getRuntime().availableProcessors() * 4 + 1;
        for (int i = 0; i < nThreads; ++i) {
            this.sendAsyncs.add(this.httpClient.sendAsync(request, HttpRequestOptions.builder().responseTimeout(10000).build()));
        }
        PollingProber.probe((long)5000L, (long)50L, () -> awaiting.get() >= Runtime.getRuntime().availableProcessors() * 2);
        PollingProber.probe((long)5000L, (long)50L, () -> {
            Assert.assertThat((Object)this.httpClient.send(request, HttpRequestOptions.builder().responseTimeout(1000).build()).getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode()));
            return true;
        });
    }

    @Test
    @Story(value="Backpressure")
    public void backpressureFlowRefMaxConcurrency() throws Exception {
        HttpRequest request = HttpRequest.builder().uri(String.format("http://localhost:%s/backpressureFlowRefMaxConcurrency?ref=backpressureFlowRefInner", this.port.getNumber())).method(HttpConstants.Method.GET).build();
        int nThreads = 2;
        for (int i = 0; i < nThreads; ++i) {
            this.sendAsyncs.add(this.httpClient.sendAsync(request, HttpRequestOptions.builder().responseTimeout(10000).build()));
        }
        PollingProber.probe((long)5000L, (long)50L, () -> awaiting.get() >= 1);
        Assert.assertThat((Object)this.httpClient.send(request).getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode()));
    }

    @Test
    @Story(value="Backpressure")
    public void backpressureFlowRefMaxConcurrencySub() throws Exception {
        HttpRequest request = HttpRequest.builder().uri(String.format("http://localhost:%s/backpressureFlowRefMaxConcurrency?ref=backpressureFlowRefInnerSub", this.port.getNumber())).method(HttpConstants.Method.GET).build();
        int nThreads = 2;
        for (int i = 0; i < nThreads; ++i) {
            this.sendAsyncs.add(this.httpClient.sendAsync(request, HttpRequestOptions.builder().responseTimeout(10000).build()));
        }
        PollingProber.probe((long)5000L, (long)50L, () -> awaiting.get() >= 1);
        Assert.assertThat((Object)this.httpClient.send(request).getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode()));
    }

    @Test
    @Story(value="Backpressure")
    @Issue(value="MULE-19328")
    public void backpressureMustNotBeTriggeredAfterFlowRestart() throws Exception {
        this.flowRunner("outerFlowWithMaxConcurrency").dispatchAsync(this.asyncFlowRunnerScheduler);
        PollingProber.probe((long)5000L, (long)50L, () -> awaiting.get() == 1);
        this.referencedFlowWithMaxConcurrency.stop();
        this.referencedFlowWithMaxConcurrency.start();
        latch.countDown();
        this.flowRunner("outerFlowWithMaxConcurrency").run();
    }

    @Test
    public void flowWithStoppedTargetFlowFailsToProcess() throws Exception {
        this.flowRunner("stoppedTargetFlow1").runExpectingException(ErrorTypeMatcher.errorType((String)"MULE", (String)"UNKNOWN"));
    }

    private void testRecursiveFlowrefsAreDetectedFor(String callingFlowName, String offendingFlowName) {
        try {
            this.flowRunner(callingFlowName);
            Assert.fail((String)("Expected and error regarding a flowref cycle from " + callingFlowName + ", and with the offending flow being " + offendingFlowName));
        }
        catch (Exception e) {
            Throwable rootCause = Throwables.getRootCause((Throwable)e);
            Assert.assertThat((Object)rootCause.getMessage(), (Matcher)CoreMatchers.endsWith((String)String.format("Found a possible infinite recursion involving flow named %s", offendingFlowName)));
        }
    }

    @Test
    @Issue(value="MULE-18178")
    @Stories(value={@Story(value="Backpressure"), @Story(value="Max concurrency")})
    @Description(value="The maxConcurrency of a target flow called via flow-ref is enforced")
    public void backpressureFlowRefMaxConcurrencyStatic() throws Exception {
        this.flowRunner("backpressureFlowRefOuterMaxConcurrencyStatic").dispatchAsync(this.asyncFlowRunnerScheduler);
        PollingProber.probe((long)5000L, (long)50L, () -> awaiting.get() == 1);
        this.flowRunner("backpressureFlowRefOuterMaxConcurrencyStatic").dispatchAsync(this.asyncFlowRunnerScheduler);
        Thread.sleep(5000L);
        PollingProber.probe((long)5000L, (long)50L, () -> awaiting.get() == 1);
        latch.countDown();
        PollingProber.probe((long)5000L, (long)50L, () -> awaiting.get() == 2);
    }

    @Test
    @Issue(value="MULE-18304")
    @Description(value="Verify that operations inner fluxes are not terminated when within a dynamically invoked sub-flow.")
    public void dynamicFlowRefWithSdkOperation() throws Exception {
        this.flowRunner("dynamicFlowRefWithSdkOperation").run();
        this.flowRunner("dynamicFlowRefWithSdkOperation").run();
    }

    @Test
    @Issue(value="MULE-19319")
    @Description(value="For each with a flow ref and max concurrency finish processing")
    public void forEachWithFlowRefAndMaxConcurrency() throws Exception {
        Integer[] payload = new Integer[]{1, 2, 3};
        Assert.assertThat((Object)((FlowRunner)this.flowRunner("foreachWithFlowRefAndMaxConcurrency").withPayload((Object)payload)).run().getMessage().getPayload().getValue(), (Matcher)Matchers.is((Object)payload));
    }

    public static int getCallbackInFlight() {
        return callbackInFlight.get();
    }

    static {
        callbackInFlight = new AtomicInteger();
        awaiting = new AtomicInteger();
    }

    public static class LatchAwaitCpuIntensiveProcessor
    extends AbstractComponent
    implements Processor {
        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.CPU_INTENSIVE;
        }

        public CoreEvent process(CoreEvent event) throws MuleException {
            callbackInFlight.incrementAndGet();
            awaiting.incrementAndGet();
            try {
                latch.await();
            }
            catch (InterruptedException e) {
                throw new MuleRuntimeException((Throwable)e);
            }
            callbackInFlight.decrementAndGet();
            return event;
        }
    }
}

