/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.impl.functional.client;

import io.qameta.allure.Description;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.DisplayName;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.client.HttpClientConfiguration;
import org.mule.runtime.http.api.client.HttpRequestOptions;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.runtime.http.api.domain.entity.InputStreamHttpEntity;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.runtime.http.api.domain.message.response.HttpResponseBuilder;
import org.mule.service.http.impl.functional.FillAndWaitStream;
import org.mule.service.http.impl.functional.client.AbstractHttpClientTestCase;
import org.mule.service.http.impl.service.HttpServiceImplementation;
import org.mule.service.http.test.common.util.ResponseReceivedProbe;
import org.mule.tck.SimpleUnitTestSupportSchedulerService;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.slf4j.MDC;

@Story(value="Streaming")
@DisplayName(value="Validates HTTP client behaviour against a streaming server.")
public class HttpClientStreamingTestCase
extends AbstractHttpClientTestCase {
    private static final int RESPONSE_TIMEOUT = 3000;
    private static final int TIMEOUT_MILLIS = 1000;
    private static final int POLL_DELAY_MILLIS = 200;
    private static Latch latch;
    private Latch beforeResponseLatch;
    private AtomicBoolean serverShouldThrowException;
    private HttpClientConfiguration.Builder clientBuilder = new HttpClientConfiguration.Builder().setName("streaming-test");
    private PollingProber pollingProber = new PollingProber(1000L, 200L);

    public HttpClientStreamingTestCase(String serviceToLoad) {
        super(serviceToLoad);
    }

    @Before
    public void createLatch() {
        latch = new Latch();
    }

    @Test
    @Description(value="Uses a streaming HTTP client to send a non blocking request which will finish before the stream is released.")
    public void nonBlockingStreaming() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setResponseBufferSize(DataUnit.KB.toBytes(10)).setStreaming(true).build());
        client.start();
        Reference responseReference = new Reference();
        try {
            client.sendAsync(this.getRequest(), this.getDefaultOptions(3000)).whenComplete((response, exception) -> responseReference.set(response));
            this.pollingProber.check((Probe)new ResponseReceivedProbe(responseReference));
            this.verifyStreamed((HttpResponse)responseReference.get());
        }
        finally {
            client.stop();
        }
    }

    @Test
    @Description(value="Uses a streaming HTTP client to send a non blocking request and asserts that MDC values are propagated to the response handler when the request fails due to timeout.")
    public void nonBlockingStreamingMDCPropagationOnError() throws Exception {
        this.testMdcPropagation(true, true);
    }

    @Test
    @Description(value="Uses a non streaming HTTP client to send a non blocking request and asserts that MDC values are propagated to the response handler when the request fails due to timeout.")
    public void nonBlockingNoStreamingMDCPropagationOnError() throws Exception {
        this.testMdcPropagation(false, true);
    }

    @Test
    @Description(value="Uses a streaming HTTP client to send a non blocking request and asserts that MDC values are propagated to the response handler when the request is executed successfully.")
    public void nonBlockingStreamingMDCPropagationNoError() throws Exception {
        this.testMdcPropagation(true, false);
    }

    @Test
    @Description(value="Uses a non streaming HTTP client to send a non blocking request and asserts that MDC values are propagated to the response handler when the request is executed successfully.")
    public void nonBlockingNoStreamingMDCPropagationNoError() throws Exception {
        this.testMdcPropagation(false, false);
    }

    @Test
    @Description(value="Uses a non streaming HTTP client to send a non blocking request which will not finish until the stream is released.")
    public void nonBlockingMemory() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setStreaming(false).build());
        client.start();
        Reference responseReference = new Reference();
        try {
            client.sendAsync(this.getRequest(), this.getDefaultOptions(3000)).whenComplete((response, exception) -> responseReference.set(response));
            this.verifyNotStreamed((Reference<HttpResponse>)responseReference);
        }
        finally {
            client.stop();
        }
    }

    @Test
    @Description(value="Uses a streaming HTTP client to send a blocking request which will finish before the stream is released.")
    public void blockingStreaming() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setStreaming(true).build());
        client.start();
        try {
            HttpResponse response = client.send(this.getRequest(), this.getDefaultOptions(3000));
            this.verifyStreamed(response);
        }
        finally {
            client.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Uses a non streaming HTTP client to send a request which will not finish until the stream is released.")
    public void blockingMemory() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setStreaming(false).build());
        client.start();
        Reference responseReference = new Reference();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try {
            executorService.execute(() -> {
                try {
                    responseReference.set((Object)client.send(this.getRequest(), this.getDefaultOptions(3000)));
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            this.verifyNotStreamed((Reference<HttpResponse>)responseReference);
        }
        finally {
            executorService.shutdown();
            client.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Issue(value="MULE-19072")
    @Description(value="The client should use other thread-pool than the uber to avoid a deadlock when using a PipedInputStream")
    public void responseStreamingDoesNotUseUberPoolToWritePartsToThePipe() throws IOException {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setResponseBufferSize(DataUnit.KB.toBytes(10)).setStreaming(true).build());
        client.start();
        Reference responseReference = new Reference();
        Reference threadGroupName = new Reference();
        try {
            client.sendAsync(this.getRequest(), this.getDefaultOptions(3000)).whenComplete((response, exception) -> {
                responseReference.set(response);
                threadGroupName.set((Object)Thread.currentThread().getThreadGroup().getName());
            });
            this.pollingProber.check((Probe)new ResponseReceivedProbe(responseReference));
            this.verifyStreamed((HttpResponse)responseReference.get());
            MatcherAssert.assertThat((String)"Response streaming uses a common IO thread-pool to write to the pipe", (Object)((String)threadGroupName.get()), (Matcher)Matchers.not((Matcher)Matchers.containsString((String)SimpleUnitTestSupportSchedulerService.UNIT_TEST_THREAD_GROUP.getName())));
        }
        finally {
            client.stop();
        }
    }

    private HttpRequest getRequest(String uri) {
        return HttpRequest.builder().uri(uri).build();
    }

    private HttpRequest getRequest() {
        return this.getRequest(this.getUri());
    }

    private void verifyStreamed(HttpResponse response) throws IOException {
        MatcherAssert.assertThat((Object)response.getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.OK.getStatusCode()));
        latch.release();
        this.verifyBody(response);
    }

    private void verifyNotStreamed(Reference<HttpResponse> responseReference) throws Exception {
        Thread.sleep(1000L);
        MatcherAssert.assertThat((Object)((HttpResponse)responseReference.get()), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        latch.release();
        this.pollingProber.check((Probe)new ResponseReceivedProbe(responseReference));
        MatcherAssert.assertThat((Object)((HttpResponse)responseReference.get()).getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.OK.getStatusCode()));
        this.verifyBody((HttpResponse)responseReference.get());
    }

    private void verifyBody(HttpResponse response) throws IOException {
        MatcherAssert.assertThat((Object)IOUtils.toString((InputStream)response.getEntity().getContent(), (Charset)StandardCharsets.UTF_8).length(), (Matcher)Matchers.is((Object)14336));
    }

    @Override
    protected HttpResponse setUpHttpResponse(HttpRequest request) {
        if (Objects.nonNull(this.beforeResponseLatch)) {
            try {
                this.beforeResponseLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        if (this.serverShouldThrowException != null && this.serverShouldThrowException.get()) {
            throw new RuntimeException("Forced Timeout");
        }
        return ((HttpResponseBuilder)HttpResponse.builder().statusCode(Integer.valueOf(HttpConstants.HttpStatus.OK.getStatusCode())).reasonPhrase(HttpConstants.HttpStatus.OK.getReasonPhrase()).entity((HttpEntity)new InputStreamHttpEntity((InputStream)new FillAndWaitStream(latch)))).build();
    }

    @Override
    protected HttpRequestOptions getDefaultOptions(int responseTimeout) {
        return HttpRequestOptions.builder().responseTimeout(responseTimeout).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testMdcPropagation(boolean shouldStream, boolean shouldThrowException) throws IOException {
        this.serverShouldThrowException = new AtomicBoolean(shouldThrowException);
        this.beforeResponseLatch = new Latch();
        HttpClient client = ((HttpServiceImplementation)this.service).getClientFactory(SchedulerConfig.config().withName("test-scheduler").withMaxConcurrentTasks(5), f -> false).create(this.clientBuilder.setResponseBufferSize(DataUnit.KB.toBytes(10)).setStreaming(shouldStream).build());
        client.start();
        Reference responseReference = new Reference();
        if (!shouldThrowException) {
            latch.release();
        }
        String transactionId = UUID.randomUUID().toString();
        HashMap capture = new HashMap();
        MDC.put((String)"transactionId", (String)transactionId);
        MDC.put((String)"currentThread", (String)Thread.currentThread().getName());
        try {
            client.sendAsync(this.getRequest(), this.getDefaultOptions(3000)).whenComplete((response, exception) -> {
                if (shouldThrowException) {
                    responseReference.set((Object)HttpResponse.builder().statusCode(Integer.valueOf(HttpConstants.HttpStatus.INTERNAL_SERVER_ERROR.getStatusCode())).build());
                } else {
                    responseReference.set(response);
                }
                capture.put("exception", exception);
                capture.put("transactionId", MDC.get((String)"transactionId"));
                capture.put("currentThread", Thread.currentThread().getName());
            });
            this.beforeResponseLatch.release();
            this.pollingProber.check((Probe)new ResponseReceivedProbe(responseReference));
            MatcherAssert.assertThat(capture.get("exception"), (Matcher)(shouldThrowException ? Matchers.notNullValue() : Matchers.nullValue()));
            MatcherAssert.assertThat((Object)MDC.get((String)"transactionId"), (Matcher)Matchers.is(capture.get("transactionId")));
            MatcherAssert.assertThat((Object)MDC.get((String)"currentThread"), (Matcher)Matchers.is((Matcher)Matchers.not(capture.get("currentThread"))));
            MatcherAssert.assertThat((Object)((HttpResponse)responseReference.get()).getStatusCode(), (Matcher)(shouldThrowException ? Matchers.is((Object)HttpConstants.HttpStatus.INTERNAL_SERVER_ERROR.getStatusCode()) : Matchers.is((Object)HttpConstants.HttpStatus.OK.getStatusCode())));
        }
        finally {
            client.stop();
            this.beforeResponseLatch = null;
            this.serverShouldThrowException = null;
        }
    }
}

