/*
 * Decompiled with CFR 0.152.
 */
package org.mule.test.module.extension.streaming;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.commons.lang3.RandomStringUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;
import org.mule.functional.api.flow.FlowRunner;
import org.mule.metadata.api.model.MetadataType;
import org.mule.metadata.api.model.UnionType;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.meta.model.config.ConfigurationModel;
import org.mule.runtime.api.meta.model.operation.OperationModel;
import org.mule.runtime.api.meta.model.parameter.ParameterModel;
import org.mule.runtime.api.meta.model.parameter.ParameterizedModel;
import org.mule.runtime.api.meta.model.source.SourceModel;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.mule.test.module.extension.internal.util.ExtensionsTestUtils;
import org.mule.test.module.extension.streaming.AbstractStreamingExtensionTestCase;

@Feature(value="Streaming")
@Story(value="Bytes Streaming")
public class BytesStreamingExtensionTestCase
extends AbstractStreamingExtensionTestCase {
    private static final String BARGAIN_SPELL = "dormammu i've come to bargain";
    public static final String TOO_BIG = "Too big!";
    private static List<String> CASTED_SPELLS = new LinkedList<String>();
    private String data = RandomStringUtils.randomAlphabetic((int)2048);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void addSpell(String spell) {
        List<String> list = CASTED_SPELLS;
        synchronized (list) {
            CASTED_SPELLS.add(spell);
        }
    }

    protected String getConfigFile() {
        return "bytes-streaming-extension-config.xml";
    }

    @Override
    protected void doTearDownAfterMuleContextDispose() throws Exception {
        super.doTearDownAfterMuleContextDispose();
        CASTED_SPELLS.clear();
    }

    protected boolean isDisposeContextPerClass() {
        return true;
    }

    @Test
    @Description(value="Fully consume a cursor stream")
    public void consumeGeneratedCursorAndCloseIt() throws Exception {
        Object value = ((FlowRunner)this.flowRunner("consumeGeneratedStream").withPayload((Object)this.data)).run().getMessage().getPayload().getValue();
        Assert.assertThat((Object)value, (Matcher)CoreMatchers.is((Object)this.data));
    }

    @Test
    @Description(value="Operation with disabled streaming")
    public void operationWithDisabledStreaming() throws Exception {
        Object value = ((FlowRunner)this.flowRunner("toSimpleStream").withPayload((Object)this.data)).run().getMessage().getPayload().getValue();
        Assert.assertThat((Object)value, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(InputStream.class)));
        Assert.assertThat((Object)IOUtils.toString((InputStream)((InputStream)value)), (Matcher)CoreMatchers.is((Object)this.data));
    }

    @Test(expected=Exception.class)
    @Description(value="If the flow fails, all cursors should be closed")
    public void allStreamsClosedInCaseOfException() throws Exception {
        ((FlowRunner)this.flowRunner("crashCar").withPayload((Object)this.data)).run();
    }

    @Test(expected=Exception.class)
    @Description(value="If a cursor is open in a transaction, it should be closed if the flow fails")
    public void allStreamsClosedInCaseOfExceptionInTx() throws Exception {
        ((FlowRunner)this.flowRunner("crashCarTx").withPayload((Object)this.data)).run();
    }

    @Test
    @Description(value="Read a stream from a random position")
    public void seek() throws Exception {
        this.doSeek("seekStream");
    }

    @Test
    @Description(value="Rewing a stream and consume it twice")
    public void rewind() throws Exception {
        InternalEvent result = ((FlowRunner)this.flowRunner("rewind").withPayload((Object)this.data)).run();
        Message firstRead = (Message)((TypedValue)result.getVariables().get("firstRead")).getValue();
        Message secondRead = (Message)((TypedValue)result.getVariables().get("secondRead")).getValue();
        Assert.assertThat((Object)firstRead.getPayload().getValue(), (Matcher)CoreMatchers.equalTo((Object)this.data));
        Assert.assertThat((Object)secondRead.getPayload().getValue(), (Matcher)CoreMatchers.equalTo((Object)this.data));
    }

    @Test
    @Description(value="Read from a random position inside a transaction")
    public void seekInTx() throws Exception {
        this.doSeek("seekStreamTx");
    }

    @Test
    @Description(value="When the max buffer size is exceeded, the correct type of error is mapped")
    public void throwsBufferSizeExceededError() throws Exception {
        this.data = RandomStringUtils.randomAlphabetic((int)DataUnit.KB.toBytes(60));
        Object value = ((FlowRunner)this.flowRunner("bufferExceeded").withPayload((Object)this.data)).run().getMessage().getPayload().getValue();
        Assert.assertThat((Object)value, (Matcher)CoreMatchers.is((Object)TOO_BIG));
    }

    private void doSeek(String flowName) throws Exception {
        int position = 10;
        InternalEvent result = ((FlowRunner)((FlowRunner)this.flowRunner(flowName).withPayload((Object)this.data)).withVariable("position", (Object)10)).run();
        Object value = result.getMessage().getPayload().getValue();
        Assert.assertThat((Object)value, (Matcher)CoreMatchers.is((Object)this.data.substring(10)));
    }

    @Test
    @Description(value="A source generates a cursor stream")
    public void sourceStreaming() throws Exception {
        this.startSourceAndListenSpell("bytesCaster", this.bargainPredicate());
    }

    @Test
    @Description(value="When the max buffer size is exceeded on a stream generated in a source, the correct type of error is mapped")
    public void sourceThrowsBufferSizeExceededError() throws Exception {
        this.startSourceAndListenSpell("sourceWithExceededBuffer", s -> TOO_BIG.equals(s));
    }

    @Test
    @Description(value="A source generates a cursor in a transaction")
    public void sourceStreamingInTx() throws Exception {
        this.startSourceAndListenSpell("bytesCasterInTx", this.bargainPredicate());
    }

    @Test
    @Description(value="A source is configured not to stream")
    public void sourceWithoutStreaming() throws Exception {
        this.startSourceAndListenSpell("bytesCasterWithoutStreaming", this.bargainPredicate());
    }

    @Test
    @Description(value="A stream provider is serialized as a byte[]")
    public void streamProviderSerialization() throws Exception {
        CursorStreamProvider provider = (CursorStreamProvider)((FlowRunner)this.flowRunner("toStream").keepStreamsOpen().withPayload((Object)this.data)).run().getMessage().getPayload().getValue();
        byte[] bytes = muleContext.getObjectSerializer().getInternalProtocol().serialize((Object)provider);
        bytes = (byte[])muleContext.getObjectSerializer().getInternalProtocol().deserialize(bytes);
        Assert.assertThat((Object)new String(bytes, Charset.defaultCharset()), (Matcher)CoreMatchers.equalTo((Object)this.data));
    }

    @Test
    @Description(value="Streaming operation has a streaming strategy parameter")
    public void streamingStrategyParameterInOperation() throws Exception {
        ParameterModel streamingParameter = this.getStreamingStrategyParameterModel(() -> (OperationModel)this.getConfigurationModel().getOperationModel("toStream").get());
        this.assertStreamingStrategyParameter(streamingParameter);
    }

    @Test
    @Description(value="Streaming source has a streaming strategy parameter")
    public void streamingStrategyParameterInSource() throws Exception {
        ParameterModel streamingParameter = this.getStreamingStrategyParameterModel(() -> (SourceModel)this.getConfigurationModel().getSourceModel("bytes-caster").get());
        this.assertStreamingStrategyParameter(streamingParameter);
    }

    private ParameterModel getStreamingStrategyParameterModel(Supplier<ParameterizedModel> model) {
        return model.get().getAllParameterModels().stream().filter(p -> p.getName().equals("streamingStrategy")).findFirst().get();
    }

    private ConfigurationModel getConfigurationModel() {
        return this.getExtensionModel("Marvel").map(extension -> (ConfigurationModel)extension.getConfigurationModel("dr-strange").get()).get();
    }

    private void assertStreamingStrategyParameter(ParameterModel parameter) {
        ExtensionsTestUtils.assertType((MetadataType)parameter.getType(), Object.class, UnionType.class);
    }

    private void startSourceAndListenSpell(String flowName, Predicate<String> predicate) throws Exception {
        this.startFlow(flowName);
        new PollingProber(4000L, 100L).check((Probe)new JUnitLambdaProbe(() -> {
            List<String> list = CASTED_SPELLS;
            synchronized (list) {
                return CASTED_SPELLS.stream().anyMatch(predicate);
            }
        }));
    }

    private Predicate<String> bargainPredicate() {
        return s -> s.equals(BARGAIN_SPELL);
    }

    private void startFlow(String flowName) throws MuleException {
        Flow flow = (Flow)muleContext.getRegistry().get(flowName);
        flow.start();
    }
}

