/*
 * Decompiled with CFR 0.152.
 */
package org.mule.test.module.extension.streaming;

import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mule.functional.api.flow.FlowRunner;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.streaming.object.CursorIteratorProvider;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.streaming.iterator.ConsumerStreamingIterator;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.mule.test.module.extension.streaming.AbstractStreamingExtensionTestCase;

@Feature(value="Streaming")
@Story(value="Object Streaming")
public class ObjectStreamingExtensionTestCase
extends AbstractStreamingExtensionTestCase {
    private static final int DATA_SIZE = 100;
    private static final String MY_STREAM_VAR = "myStreamVar";
    private List<String> data;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    @Rule
    public SystemProperty withStatistics = new SystemProperty("mule.enable.statistics", "true");

    protected void doSetUp() throws Exception {
        super.doSetUp();
        this.data = new ArrayList<String>(100);
        for (int i = 0; i < 100; ++i) {
            this.data.add(RandomStringUtils.randomAlphabetic((int)100));
        }
    }

    protected String getConfigFile() {
        return "streaming/objects-streaming-extension-config.xml";
    }

    @Test
    @io.qameta.allure.Description(value="Consume an object stream")
    public void getObjectStream() throws Exception {
        this.assertStreamMatchesData("getStreamWithoutStreaming");
    }

    @Test
    @io.qameta.allure.Description(value="Stores an object stream in a variable leaving without modifying the original payload")
    public void getObjectStreamWithTargetValue() throws Exception {
        CoreEvent event = ((FlowRunner)this.flowRunner("getStreamWithTargetValue").withPayload(this.data)).run();
        MatcherAssert.assertThat((Object)((TypedValue)event.getVariables().get(MY_STREAM_VAR)).getValue(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(String.class)));
        MatcherAssert.assertThat((Object)((TypedValue)event.getVariables().get(MY_STREAM_VAR)).getValue(), (Matcher)CoreMatchers.equalTo((Object)this.data.get(0)));
    }

    @Test
    @io.qameta.allure.Description(value="Stores an object stream in a variable leaving without modifying the original payload")
    public void getObjectStreamWithTargetVariable() throws Exception {
        CoreEvent event = ((FlowRunner)this.flowRunner("getStreamWithTarget").keepStreamsOpen().withPayload(this.data)).run();
        MatcherAssert.assertThat((Object)((TypedValue)event.getVariables().get(MY_STREAM_VAR)).getValue(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(CursorIteratorProvider.class)));
        MatcherAssert.assertThat(StreamSupport.stream(Spliterators.spliteratorUnknownSize((Iterator)((CursorIteratorProvider)((TypedValue)event.getVariables().get(MY_STREAM_VAR)).getValue()).openCursor(), 16), false).collect(Collectors.toList()), (Matcher)CoreMatchers.equalTo(this.data));
        MatcherAssert.assertThat((Object)event.getMessage().getPayload().getValue(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(List.class)));
        MatcherAssert.assertThat((Object)event.getMessage().getPayload().getValue(), (Matcher)CoreMatchers.equalTo(this.data));
    }

    @Test
    @io.qameta.allure.Description(value="Operation is configured not to stream")
    public void operationWithDisabledStreaming() throws Exception {
        this.assertStreamMatchesData("getStreamWithoutStreaming");
    }

    @Test
    @io.qameta.allure.Description(value="Operation is configured not to stream and stream gets closed automatically even if not consumed")
    public void nonRepeatableStreamIsManaged() throws Exception {
        Object stream = this.getObjectStream("getStreamWithoutStreaming", false);
        MatcherAssert.assertThat((Object)stream, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(ConsumerStreamingIterator.class)));
        ConsumerStreamingIterator streamingIterator = (ConsumerStreamingIterator)stream;
        new PollingProber(1000L, 100L).check((Probe)new JUnitLambdaProbe(() -> {
            MatcherAssert.assertThat((Object)streamingIterator.hasNext(), (Matcher)CoreMatchers.is((Object)false));
            return true;
        }));
        this.expectedException.expect((Matcher)new BaseMatcher<Throwable>(){

            public boolean matches(Object o) {
                return o.getClass().getName().equals("org.mule.runtime.core.internal.streaming.object.iterator.ClosedConsumerException");
            }

            public void describeTo(Description description) {
                description.appendText("Exception was not a ClosedConsumerException");
            }
        });
        streamingIterator.next();
    }

    @Test
    @io.qameta.allure.Description(value="Consume a stream generated in a transaction")
    public void getStreamInTx() throws Exception {
        this.assertStreamMatchesData("getStreamInTx");
    }

    @Test(expected=Exception.class)
    @io.qameta.allure.Description(value="All cursors closed when the flow fails")
    public void allStreamsClosedInCaseOfException() throws Exception {
        ((FlowRunner)this.flowRunner("crashCar").withPayload(this.data)).run();
    }

    @Test(expected=Exception.class)
    @io.qameta.allure.Description(value="All cursors closed when the flow fails in a transaction")
    public void allStreamsClosedInCaseOfExceptionInTx() throws Exception {
        ((FlowRunner)this.flowRunner("crashCarTx").withPayload(this.data)).run();
    }

    private Object getObjectStream(String flowName, boolean keepStreamsOpen) throws Exception {
        FlowRunner flowRunner = (FlowRunner)this.flowRunner(flowName).withPayload(this.data);
        if (keepStreamsOpen) {
            flowRunner.keepStreamsOpen();
        }
        return flowRunner.run().getMessage().getPayload().getValue();
    }

    private List<String> consumeObjectStream(String flowName, boolean keepStreamsOpen) throws Exception {
        Object stream = this.getObjectStream(flowName, keepStreamsOpen);
        if (stream instanceof Iterator) {
            Iterator it = (Iterator)stream;
            LinkedList<String> list = new LinkedList<String>();
            it.forEachRemaining(list::add);
            return list;
        }
        if (stream instanceof List) {
            return (List)stream;
        }
        throw new IllegalStateException("Stream of unknown type: " + String.valueOf(stream.getClass()));
    }

    private void assertStreamMatchesData(String flowName) throws Exception {
        List<String> actual = this.consumeObjectStream(flowName, true);
        MatcherAssert.assertThat(actual, (Matcher)CoreMatchers.equalTo(this.data));
    }

    public static class AssertPayloadIsNotStreamProvider
    implements Processor {
        public CoreEvent process(CoreEvent event) throws MuleException {
            MatcherAssert.assertThat((Object)event.getMessage().getPayload().getValue().getClass().getName(), (Matcher)CoreMatchers.not((Matcher)CoreMatchers.containsString((String)"CursorIteratorProvider")));
            return event;
        }
    }

    public static class AssertPayloadIsStreamProvider
    implements Processor {
        public CoreEvent process(CoreEvent event) throws MuleException {
            MatcherAssert.assertThat((Object)event.getMessage().getPayload().getValue().getClass().getName(), (Matcher)CoreMatchers.containsString((String)"CursorIteratorProvider"));
            return event;
        }
    }
}

