/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.impl.service.client;

import io.qameta.allure.Issue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mule.functional.junit4.matchers.ThrowableMessageMatcher;
import org.mule.service.http.impl.service.client.NonBlockingStreamWriter;
import org.mule.service.http.impl.service.util.ThreadContext;
import org.mule.tck.MuleTestUtils;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.slf4j.MDC;

@Issue(value="W-17048606")
public class NonBlockingStreamWriterTestCase
extends AbstractMuleTestCase {
    private static final int TEST_TIME_TO_SLEEP_WHEN_COULD_NOT_WRITE_MILLIS = 50;
    private static final byte[] SOME_DATA = "Some data to write".getBytes();
    private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private NonBlockingStreamWriter nonBlockingStreamWriter;

    @Before
    public void setUp() {
        this.nonBlockingStreamWriter = new NonBlockingStreamWriter(50, true);
    }

    @After
    public void tearDown() {
        this.nonBlockingStreamWriter.stop();
    }

    @AfterClass
    public static void tearDownClass() {
        executorService.shutdownNow();
    }

    @Test
    @Issue(value="W-17624200")
    public void isEnabledVariants() throws Exception {
        MatcherAssert.assertThat((Object)new NonBlockingStreamWriter(1, true).isEnabled(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)new NonBlockingStreamWriter(1, false).isEnabled(), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)new NonBlockingStreamWriter().isEnabled(), (Matcher)Matchers.is((Object)false));
        MuleTestUtils.testWithSystemProperty((String)"mule.http.client.responseStreaming.nonBlockingWriter", (String)"true", () -> MatcherAssert.assertThat((Object)new NonBlockingStreamWriter().isEnabled(), (Matcher)Matchers.is((Object)true)));
    }

    @Test
    public void writesIfAvailableSpace() throws ExecutionException, InterruptedException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        this.nonBlockingStreamWriter.addDataToWrite((OutputStream)out, SOME_DATA, (Supplier)new SequenceProvider(SOME_DATA.length + 1)).get();
        MatcherAssert.assertThat((Object)out.toByteArray(), (Matcher)Matchers.is((Object)SOME_DATA));
    }

    @Test
    public void partiallyWritesIfNotEnoughSpace() {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        CompletableFuture future = this.nonBlockingStreamWriter.addDataToWrite((OutputStream)out, SOME_DATA, (Supplier)new SequenceProvider(SOME_DATA.length - 1));
        MatcherAssert.assertThat((Object)future.isDone(), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)out.toByteArray().length, (Matcher)Matchers.is((Object)(SOME_DATA.length - 1)));
    }

    @Test
    public void writesAllProgressivelyWhenSpaceIsGenerated() throws ExecutionException, InterruptedException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        this.nonBlockingStreamWriter.addDataToWrite((OutputStream)out, SOME_DATA, (Supplier)new SequenceProvider(SOME_DATA.length - 5, 5)).get();
        MatcherAssert.assertThat((Object)out.toByteArray(), (Matcher)Matchers.is((Object)SOME_DATA));
    }

    @Test
    public void failsWhenStreamIsClosed() {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        ExecutionException exception = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.nonBlockingStreamWriter.addDataToWrite((OutputStream)out, SOME_DATA, (Supplier)new SequenceProvider(-1)).get();
        });
        Throwable cause = exception.getCause();
        MatcherAssert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(IOException.class));
        MatcherAssert.assertThat((Object)cause, (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)Matchers.containsString((String)"Pipe closed")));
    }

    @Test
    public void writesAllAsync() throws ExecutionException, InterruptedException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        SequenceProvider sequenceWithAZeroInTheMiddle = new SequenceProvider(SOME_DATA.length - 5, 0, 5);
        CompletableFuture future = this.nonBlockingStreamWriter.addDataToWrite((OutputStream)out, SOME_DATA, (Supplier)sequenceWithAZeroInTheMiddle);
        MatcherAssert.assertThat((String)"The writer is not scheduled anywhere yet, so the future shouldn't be completed", (Object)future.isDone(), (Matcher)Matchers.is((Object)false));
        executorService.submit((Runnable)this.nonBlockingStreamWriter);
        future.get();
        MatcherAssert.assertThat((Object)out.toByteArray(), (Matcher)Matchers.is((Object)SOME_DATA));
    }

    @Test
    public void failureAsync() {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        SequenceProvider sequenceWithAZeroInTheMiddle = new SequenceProvider(SOME_DATA.length - 5, 0, -1);
        CompletableFuture future = this.nonBlockingStreamWriter.addDataToWrite((OutputStream)out, SOME_DATA, (Supplier)sequenceWithAZeroInTheMiddle);
        MatcherAssert.assertThat((String)"The writer is not scheduled anywhere yet, so the future shouldn't be completed", (Object)future.isDone(), (Matcher)Matchers.is((Object)false));
        executorService.submit((Runnable)this.nonBlockingStreamWriter);
        ExecutionException exception = (ExecutionException)Assert.assertThrows(ExecutionException.class, future::get);
        Throwable cause = exception.getCause();
        MatcherAssert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(IOException.class));
        MatcherAssert.assertThat((Object)cause, (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)Matchers.containsString((String)"Pipe closed")));
    }

    @Test
    public void streamWithoutSpaceManyTimesSleeps() throws ExecutionException, InterruptedException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        SequenceProvider sequenceWithThreeZeroesInTheMiddle = new SequenceProvider(SOME_DATA.length - 5, 0, 0, 0, 5);
        CompletableFuture future = this.nonBlockingStreamWriter.addDataToWrite((OutputStream)out, SOME_DATA, (Supplier)sequenceWithThreeZeroesInTheMiddle);
        MatcherAssert.assertThat((String)"The writer is not scheduled anywhere yet, so the future shouldn't be completed", (Object)future.isDone(), (Matcher)Matchers.is((Object)false));
        long millisBeforeSchedule = System.currentTimeMillis();
        executorService.submit((Runnable)this.nonBlockingStreamWriter);
        future.get();
        long millisAfterComplete = System.currentTimeMillis();
        int elapsedMillis = (int)(millisAfterComplete - millisBeforeSchedule);
        MatcherAssert.assertThat((String)"We returned 0 three times, so the writer should have slept twice at this point", (Object)elapsedMillis, (Matcher)Matchers.is((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(100))));
        MatcherAssert.assertThat((Object)out.toByteArray(), (Matcher)Matchers.is((Object)SOME_DATA));
    }

    @Test
    @Issue(value="W-17627284")
    public void whenTheWriterIsNotifiedThenItDoesntSleepTooMuch() throws ExecutionException, InterruptedException {
        int ridiculouslyBigSleepMillis = Integer.MAX_VALUE;
        NonBlockingStreamWriter writer = new NonBlockingStreamWriter(ridiculouslyBigSleepMillis, true);
        int enoughTimeToThisTrivialOperationInMilliseconds = 100;
        ByteArrayOutputStream irrelevantSink = new ByteArrayOutputStream();
        SequenceProvider sequenceWithSeveralZeroesInTheMiddle = new SequenceProvider(SOME_DATA.length - 5, 0, 0, 0, 0, 0, 0, 0, 0, 5);
        CompletableFuture future = writer.addDataToWrite((OutputStream)irrelevantSink, SOME_DATA, (Supplier)sequenceWithSeveralZeroesInTheMiddle);
        MatcherAssert.assertThat((String)"The writer is not scheduled anywhere yet, so the future shouldn't be completed", (Object)future.isDone(), (Matcher)Matchers.is((Object)false));
        long millisBeforeSchedule = System.currentTimeMillis();
        executorService.submit((Runnable)writer);
        while (!future.isDone()) {
            writer.notifyAvailableSpace();
        }
        future.get();
        long millisAfterComplete = System.currentTimeMillis();
        int elapsedMillis = (int)(millisAfterComplete - millisBeforeSchedule);
        MatcherAssert.assertThat((String)"We returned 0 several times but we were constantly notifying that there was space in the sink, so it shouldn't take too much time", (Object)elapsedMillis, (Matcher)Matchers.is((Matcher)Matchers.lessThan((Comparable)Integer.valueOf(enoughTimeToThisTrivialOperationInMilliseconds))));
        MatcherAssert.assertThat((Object)irrelevantSink.toByteArray(), (Matcher)Matchers.is((Object)SOME_DATA));
        writer.stop();
    }

    @Test
    public void writesAllProgressivelyAsync() throws ExecutionException, InterruptedException {
        executorService.submit((Runnable)this.nonBlockingStreamWriter);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        this.nonBlockingStreamWriter.addDataToWrite((OutputStream)out, SOME_DATA, (Supplier)new SequenceProvider(SOME_DATA.length - 5, 0, 0, 1, 0, 1, 1, 0, 1, 1)).get();
        MatcherAssert.assertThat((Object)out.toByteArray(), (Matcher)Matchers.is((Object)SOME_DATA));
    }

    @Test
    public void ioExceptionOnWriteIsCaughtAndPropagatedToTheFuture() throws IOException {
        IOException expectedException = new IOException("Expected!!");
        OutputStream throwing = (OutputStream)Mockito.mock(OutputStream.class);
        ((OutputStream)Mockito.doThrow((Throwable[])new Throwable[]{expectedException}).when((Object)throwing)).write((byte[])ArgumentMatchers.any(byte[].class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        CompletableFuture future = this.nonBlockingStreamWriter.addDataToWrite(throwing, SOME_DATA, (Supplier)new SequenceProvider(SOME_DATA.length));
        ExecutionException exception = (ExecutionException)Assert.assertThrows(ExecutionException.class, future::get);
        MatcherAssert.assertThat((Object)exception.getCause(), (Matcher)Matchers.is((Object)expectedException));
    }

    @Test
    public void runtimeExceptionOnWriteIsCaughtAndPropagatedToTheFuture() throws IOException {
        RuntimeException expectedException = new RuntimeException("Expected!!");
        OutputStream throwing = (OutputStream)Mockito.mock(OutputStream.class);
        ((OutputStream)Mockito.doThrow((Throwable[])new Throwable[]{expectedException}).when((Object)throwing)).write((byte[])ArgumentMatchers.any(byte[].class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        CompletableFuture future = this.nonBlockingStreamWriter.addDataToWrite(throwing, SOME_DATA, (Supplier)new SequenceProvider(SOME_DATA.length));
        ExecutionException exception = (ExecutionException)Assert.assertThrows(ExecutionException.class, future::get);
        MatcherAssert.assertThat((Object)exception.getCause(), (Matcher)Matchers.is((Object)expectedException));
    }

    @Test
    public void interruptTheThreadDoesntInterruptTheWriterIfNotStopped() throws InterruptedException {
        NonBlockingStreamWriter writer = new NonBlockingStreamWriter();
        Thread threadOutsideTheStaticExecutor = new Thread((Runnable)writer);
        threadOutsideTheStaticExecutor.start();
        threadOutsideTheStaticExecutor.interrupt();
        threadOutsideTheStaticExecutor.join(500L);
        MatcherAssert.assertThat((Object)threadOutsideTheStaticExecutor.isAlive(), (Matcher)Matchers.is((Object)true));
        writer.stop();
        threadOutsideTheStaticExecutor.join();
        MatcherAssert.assertThat((Object)threadOutsideTheStaticExecutor.isAlive(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void interruptTheThreadAfterStopWillInterruptTheSleep() throws InterruptedException {
        int ridiculouslyBigSleepMillis = Integer.MAX_VALUE;
        NonBlockingStreamWriter writer = new NonBlockingStreamWriter(ridiculouslyBigSleepMillis, true);
        Thread threadOutsideTheStaticExecutor = new Thread((Runnable)writer);
        threadOutsideTheStaticExecutor.start();
        Thread.sleep(500L);
        writer.stop();
        threadOutsideTheStaticExecutor.join(500L);
        MatcherAssert.assertThat((Object)threadOutsideTheStaticExecutor.isAlive(), (Matcher)Matchers.is((Object)true));
        threadOutsideTheStaticExecutor.interrupt();
        threadOutsideTheStaticExecutor.join();
        MatcherAssert.assertThat((Object)threadOutsideTheStaticExecutor.isAlive(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void writeOperationIsExecutedWithSameThreadContext() throws ExecutionException, InterruptedException {
        executorService.submit((Runnable)this.nonBlockingStreamWriter);
        OutputStreamSavingThreadContext out = new OutputStreamSavingThreadContext();
        HashMap<String, String> mockMdc = new HashMap<String, String>();
        mockMdc.put("Key1", "Value1");
        mockMdc.put("Key2", "Value2");
        ClassLoader mockClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        try (ThreadContext tc = new ThreadContext(mockClassLoader, mockMdc);){
            this.nonBlockingStreamWriter.addDataToWrite((OutputStream)out, SOME_DATA, (Supplier)new SequenceProvider(SOME_DATA.length - 5, 0, 5)).get();
        }
        MatcherAssert.assertThat((Object)out.getClassLoaderOnLastWrite(), (Matcher)Matchers.is((Object)mockClassLoader));
        MatcherAssert.assertThat(out.getMDCOnLastWrite(), (Matcher)Matchers.is(mockMdc));
    }

    private static class OutputStreamSavingThreadContext
    extends OutputStream {
        private final AtomicReference<ClassLoader> classLoaderOnWrite = new AtomicReference();
        private final AtomicReference<Map<String, String>> mdcOnWrite = new AtomicReference();

        private OutputStreamSavingThreadContext() {
        }

        @Override
        public void write(int b) throws IOException {
            this.mdcOnWrite.set(MDC.getCopyOfContextMap());
            this.classLoaderOnWrite.set(Thread.currentThread().getContextClassLoader());
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.mdcOnWrite.set(MDC.getCopyOfContextMap());
            this.classLoaderOnWrite.set(Thread.currentThread().getContextClassLoader());
            super.write(b, off, len);
        }

        public ClassLoader getClassLoaderOnLastWrite() {
            return this.classLoaderOnWrite.get();
        }

        public Map<String, String> getMDCOnLastWrite() {
            return this.mdcOnWrite.get();
        }
    }

    private static class SequenceProvider
    implements Supplier<Integer> {
        private final Queue<Integer> sequence;

        SequenceProvider(Integer ... sequence) {
            this.sequence = Arrays.stream(sequence).collect(Collectors.toCollection(LinkedList::new));
        }

        @Override
        public Integer get() {
            if (this.sequence.isEmpty()) {
                return 0;
            }
            return this.sequence.remove();
        }
    }
}

