/*
 * Decompiled with CFR 0.152.
 */
package org.mule.test.module.extension.streaming;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.inject.Named;
import org.apache.commons.lang3.RandomStringUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mule.functional.api.flow.FlowRunner;
import org.mule.metadata.api.model.MetadataType;
import org.mule.metadata.api.model.UnionType;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.meta.model.config.ConfigurationModel;
import org.mule.runtime.api.meta.model.parameter.ParameterModel;
import org.mule.runtime.api.meta.model.parameter.ParameterizedModel;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.streaming.StreamingManager;
import org.mule.runtime.core.api.streaming.bytes.CursorStreamProviderFactory;
import org.mule.runtime.core.api.streaming.bytes.InMemoryCursorStreamConfig;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.dsl.api.component.config.DefaultComponentLocation;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.JUnitProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.mule.test.module.extension.internal.util.ExtensionsTestUtils;
import org.mule.test.module.extension.streaming.AbstractStreamingExtensionTestCase;

@Feature(value="Streaming")
@Story(value="Bytes Streaming")
public abstract class AbstractBytesStreamingExtensionTestCase
extends AbstractStreamingExtensionTestCase {
    private static final String BARGAIN_SPELL = "dormammu i've come to bargain";
    private static final String TOO_BIG = "Too big!";
    private static final int TIMEOUT = 2000;
    private static final int DELAY = 200;
    private static List<String> CASTED_SPELLS = new LinkedList<String>();
    @Inject
    @Named(value="bytesCaster")
    private Flow bytesCaster;
    @Inject
    @Named(value="sourceWithExceededBuffer")
    private Flow sourceWithExceededBuffer;
    @Inject
    @Named(value="bytesCasterInTx")
    private Flow bytesCasterInTx;
    @Inject
    @Named(value="bytesCasterInSdkScopeInside")
    private Flow bytesCasterInSdkScopeInside;
    @Inject
    @Named(value="bytesCasterInSdkScopeAfter")
    private Flow bytesCasterInSdkScopeAfter;
    @Inject
    @Named(value="bytesCasterWithoutStreaming")
    private Flow bytesCasterWithoutStreaming;
    @Inject
    @Named(value="toNonRepeatableStream")
    private Flow toNonRepeatableStream;
    @Inject
    private StreamingManager streamingManager;
    @Rule
    public SystemProperty configName;
    private String data = RandomStringUtils.randomAlphabetic((int)2048);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static String addSpell(String spell) {
        List<String> list = CASTED_SPELLS;
        synchronized (list) {
            CASTED_SPELLS.add(spell);
        }
        return spell;
    }

    public AbstractBytesStreamingExtensionTestCase(String configName) {
        this.configName = new SystemProperty("configName", configName);
    }

    protected String getConfigFile() {
        return "streaming/bytes-streaming-extension-config.xml";
    }

    protected void doSetUp() throws Exception {
        this.setDisposeContextPerClass(true);
    }

    @Override
    protected void doTearDownAfterMuleContextDispose() throws Exception {
        super.doTearDownAfterMuleContextDispose();
        CASTED_SPELLS.clear();
    }

    protected boolean isDisposeContextPerClass() {
        return true;
    }

    @Test
    @Description(value="Fully consume a cursor stream")
    public void consumeGeneratedCursorAndCloseIt() throws Exception {
        Object value = ((FlowRunner)this.flowRunner("consumeGeneratedStream").withPayload((Object)this.data)).run().getMessage().getPayload().getValue();
        Assert.assertThat((Object)value, (Matcher)CoreMatchers.is((Object)this.data));
    }

    @Test
    @Description(value="Operation with disabled streaming")
    public void operationWithDisabledStreaming() throws Exception {
        Object value = ((FlowRunner)this.flowRunner("toSimpleStream").withPayload((Object)this.data)).keepStreamsOpen().run().getMessage().getPayload().getValue();
        Assert.assertThat((Object)value, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(InputStream.class)));
        Assert.assertThat((Object)IOUtils.toString((InputStream)((InputStream)value)), (Matcher)CoreMatchers.is((Object)this.data));
    }

    @Test(expected=Exception.class)
    @Description(value="If the flow fails, all cursors should be closed")
    public void allStreamsClosedInCaseOfException() throws Exception {
        ((FlowRunner)this.flowRunner("crashCar").withPayload((Object)this.data)).run();
    }

    @Test(expected=Exception.class)
    @Description(value="If the flow fails, all non repeatable streams should be closed")
    public void nonRepeatableStreamsClosedInCaseOfException() throws Exception {
        ((FlowRunner)this.flowRunner("nonRepeatableCrashCar").withPayload((Object)this.data)).run();
    }

    @Test
    @Description(value="If the flow fails, all non repeatable streams should be closed")
    public void allStreamsClosedInCaseOfHandledException() throws Exception {
        ((FlowRunner)this.flowRunner("handledCrashCar").withPayload((Object)this.data)).run();
    }

    @Test
    @Description(value="If the flow fails, all non repeatable streams should be closed")
    public void nonRepeatableStreamsClosedInCaseOfHandledException() throws Exception {
        ((FlowRunner)this.flowRunner("nonRepeatableHandledCrashCar").withPayload((Object)this.data)).run();
    }

    @Test(expected=Exception.class)
    @Description(value="If a cursor is open in a transaction, it should be closed if the flow fails")
    public void allStreamsClosedInCaseOfExceptionInTx() throws Exception {
        ((FlowRunner)this.flowRunner("crashCarTx").withPayload((Object)this.data)).run();
    }

    @Test
    @Description(value="Read a stream from a random position")
    public void seek() throws Exception {
        this.doSeek("seekStream");
    }

    @Test
    @Description(value="Rewing a stream and consume it twice")
    public void rewind() throws Exception {
        CoreEvent result = ((FlowRunner)this.flowRunner("rewind").withPayload((Object)this.data)).run();
        Message firstRead = (Message)((TypedValue)result.getVariables().get("firstRead")).getValue();
        Message secondRead = (Message)((TypedValue)result.getVariables().get("secondRead")).getValue();
        Assert.assertThat((Object)firstRead.getPayload().getValue(), (Matcher)CoreMatchers.equalTo((Object)this.data));
        Assert.assertThat((Object)secondRead.getPayload().getValue(), (Matcher)CoreMatchers.equalTo((Object)this.data));
    }

    @Test
    @Description(value="Read from a random position inside a transaction")
    public void seekInTx() throws Exception {
        this.doSeek("seekStreamTx");
    }

    @Test
    @Description(value="When the max buffer size is exceeded, the correct type of error is mapped")
    public void throwsBufferSizeExceededError() throws Exception {
        this.data = RandomStringUtils.randomAlphabetic((int)DataUnit.KB.toBytes(60));
        Object value = ((FlowRunner)this.flowRunner("bufferExceeded").withPayload((Object)this.data)).run().getMessage().getPayload().getValue();
        Assert.assertThat((Object)value, (Matcher)CoreMatchers.is((Object)TOO_BIG));
    }

    private void doSeek(String flowName) throws Exception {
        int position = 10;
        CoreEvent result = ((FlowRunner)((FlowRunner)this.flowRunner(flowName).withPayload((Object)this.data)).withVariable("position", (Object)10)).run();
        Object value = result.getMessage().getPayload().getValue();
        Assert.assertThat((Object)value, (Matcher)CoreMatchers.is((Object)this.data.substring(10)));
    }

    @Test
    @Description(value="A source generates a cursor stream")
    public void sourceStreaming() throws Exception {
        this.startSourceAndListenSpell(this.bytesCaster, this.bargainPredicate());
    }

    @Test
    @Description(value="When the max buffer size is exceeded on a stream generated in a source, the correct type of error is mapped")
    public void sourceThrowsBufferSizeExceededError() throws Exception {
        this.startSourceAndListenSpell(this.sourceWithExceededBuffer, s -> TOO_BIG.equals(s));
    }

    @Test
    @Description(value="A source generates a cursor to be consumed in a transaction")
    public void sourceStreamingInTx() throws Exception {
        this.startSourceAndListenSpell(this.bytesCasterInTx, this.bargainPredicate());
    }

    @Test
    @Issue(value="MULE-19297")
    @Description(value="A source generates a cursor to be consumed inside a scope done with the SDK")
    public void sourceStreamingInSdkScopeInside() throws Exception {
        this.startSourceAndListenSpell(this.bytesCasterInSdkScopeInside, this.bargainPredicate());
    }

    @Test
    @Issue(value="MULE-19297")
    @Description(value="A source generates a cursor to be consumed inside a scope done with the SDK")
    public void sourceStreamingInSdkScopeAfter() throws Exception {
        this.startSourceAndListenSpell(this.bytesCasterInSdkScopeAfter, this.bargainPredicate());
    }

    @Test
    @Description(value="A source is configured not to stream")
    public void sourceWithoutStreaming() throws Exception {
        this.startSourceAndListenSpell(this.bytesCasterWithoutStreaming, this.bargainPredicate());
    }

    @Test
    @Description(value="New cursor is open for Object parameter which receives a CursorProvider")
    public void resolveCursorsFromObjectParams() throws Exception {
        CursorStreamProviderFactory factory = this.streamingManager.forBytes().getInMemoryCursorProviderFactory(InMemoryCursorStreamConfig.getDefault());
        CursorStreamProvider provider = (CursorStreamProvider)((FlowRunner)this.flowRunner("objectToStream").keepStreamsOpen().withPayload(factory.of(this.testEvent().getContext(), (Object)new ByteArrayInputStream(this.data.getBytes()), DefaultComponentLocation.from((String)"objectToStream")))).run().getMessage().getPayload().getValue();
        byte[] bytes = muleContext.getObjectSerializer().getInternalProtocol().serialize((Object)provider);
        bytes = (byte[])muleContext.getObjectSerializer().getInternalProtocol().deserialize(bytes);
        Assert.assertThat((Object)new String(bytes, Charset.defaultCharset()), (Matcher)CoreMatchers.equalTo((Object)this.data));
    }

    @Test
    @Description(value="A stream provider is serialized as a byte[]")
    public void streamProviderSerialization() throws Exception {
        CursorStreamProvider provider = (CursorStreamProvider)((FlowRunner)this.flowRunner("toStream").keepStreamsOpen().withPayload((Object)this.data)).run().getMessage().getPayload().getValue();
        byte[] bytes = muleContext.getObjectSerializer().getInternalProtocol().serialize((Object)provider);
        bytes = (byte[])muleContext.getObjectSerializer().getInternalProtocol().deserialize(bytes);
        Assert.assertThat((Object)new String(bytes, Charset.defaultCharset()), (Matcher)CoreMatchers.equalTo((Object)this.data));
    }

    @Test
    @Description(value="A non repeatable stream is closed automatically when flow completes")
    public void nonRepeatableStreamIsAutomaticallyClosed() throws Exception {
        final InputStream stream = (InputStream)((FlowRunner)this.flowRunner("toNonRepeatableStream").withPayload((Object)this.data)).run().getMessage().getPayload().getValue();
        new PollingProber(2000L, 200L).check((Probe)new JUnitProbe(){

            protected boolean test() throws Exception {
                Assert.assertThat((Object)stream.read(), (Matcher)CoreMatchers.is((Object)-1));
                return true;
            }

            public String describeFailure() {
                return "Stream was not automatically closed.";
            }
        });
    }

    @Test
    @Description(value="A non repeatable stream is not automatically closed when event completes async")
    public void nonRepeatableStreamClosesAsync() throws Exception {
        InputStream stream = (InputStream)((FlowRunner)this.flowRunner("toNonRepeatableStream").keepStreamsOpen().withPayload((Object)this.data)).run().getMessage().getPayload().getValue();
        byte[] bytes = new byte[this.data.length()];
        Assert.assertThat((Object)stream.read(bytes), (Matcher)CoreMatchers.is((Object)this.data.length()));
        Assert.assertThat((Object)stream.read(), (Matcher)CoreMatchers.is((Object)-1));
        Assert.assertThat((Object)new String(bytes), (Matcher)CoreMatchers.equalTo((Object)this.data));
    }

    @Test
    @Description(value="Streaming operation has a streaming strategy parameter")
    public void streamingStrategyParameterInOperation() throws Exception {
        ParameterModel streamingParameter = this.getStreamingStrategyParameterModel(() -> (ParameterizedModel)this.getConfigurationModel().getOperationModel("toStream").get());
        this.assertStreamingStrategyParameter(streamingParameter);
    }

    @Test
    @Description(value="Streaming source has a streaming strategy parameter")
    public void streamingStrategyParameterInSource() throws Exception {
        ParameterModel streamingParameter = this.getStreamingStrategyParameterModel(() -> (ParameterizedModel)this.getConfigurationModel().getSourceModel("bytes-caster").get());
        this.assertStreamingStrategyParameter(streamingParameter);
    }

    @Test
    @Description(value="Call operation multiple times in the flow")
    public void operationCalledAndOutputConsumedMultipleTimes() throws Exception {
        Object value = ((FlowRunner)this.flowRunner("toStreamMultipleTimes").withPayload((Object)this.data)).keepStreamsOpen().run().getMessage().getPayload().getValue();
        Assert.assertThat((Object)value, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(InputStream.class)));
        Assert.assertThat((Object)IOUtils.toString((InputStream)((InputStream)value)), (Matcher)CoreMatchers.is((Object)this.data));
    }

    private ParameterModel getStreamingStrategyParameterModel(Supplier<ParameterizedModel> model) {
        return model.get().getAllParameterModels().stream().filter(p -> p.getName().equals("streamingStrategy")).findFirst().get();
    }

    private ConfigurationModel getConfigurationModel() {
        return this.getExtensionModel("Marvel").map(extension -> (ConfigurationModel)extension.getConfigurationModel("dr-strange").get()).get();
    }

    private void assertStreamingStrategyParameter(ParameterModel parameter) {
        ExtensionsTestUtils.assertType((MetadataType)parameter.getType(), Object.class, UnionType.class);
    }

    private void startSourceAndListenSpell(Flow flow, Predicate<String> predicate) throws Exception {
        if (!flow.getLifecycleState().isStarted()) {
            flow.start();
        }
        new PollingProber(4000L, 100L).check((Probe)new JUnitLambdaProbe(() -> {
            List<String> list = CASTED_SPELLS;
            synchronized (list) {
                return CASTED_SPELLS.stream().anyMatch(predicate);
            }
        }));
    }

    private Predicate<String> bargainPredicate() {
        return s -> s.equals(BARGAIN_SPELL);
    }

    protected boolean isGracefulShutdown() {
        return true;
    }

    public static class AssertVariableStreamProviderIsOpen
    implements Processor {
        public CoreEvent process(CoreEvent event) throws MuleException {
            Assert.assertThat((Object)((CursorStreamProvider)((TypedValue)event.getVariables().get("provider")).getValue()).isClosed(), (Matcher)CoreMatchers.is((Object)false));
            return event;
        }
    }

    public static class AssertPayloadIsNotStreamProvider
    implements Processor {
        public CoreEvent process(CoreEvent event) throws MuleException {
            Assert.assertThat((Object)event.getMessage().getPayload().getValue(), (Matcher)Matchers.not((Matcher)CoreMatchers.instanceOf(CursorStreamProvider.class)));
            return event;
        }
    }

    public static class AssertPayloadIsStreamProvider
    implements Processor {
        public CoreEvent process(CoreEvent event) throws MuleException {
            Assert.assertThat((Object)event.getMessage().getPayload().getValue(), (Matcher)CoreMatchers.instanceOf(CursorStreamProvider.class));
            return event;
        }
    }
}

