/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.impl.functional.client;

import io.qameta.allure.Description;
import io.qameta.allure.Story;
import io.qameta.allure.junit4.DisplayName;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.client.HttpClientConfiguration;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.runtime.http.api.domain.entity.InputStreamHttpEntity;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.runtime.http.api.domain.message.response.HttpResponseBuilder;
import org.mule.service.http.impl.functional.FillAndWaitStream;
import org.mule.service.http.impl.functional.client.AbstractHttpClientTestCase;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;

@Story(value="Streaming")
@DisplayName(value="Validates HTTP client behaviour against a streaming server.")
public class HttpClientStreamingTestCase
extends AbstractHttpClientTestCase {
    private static final int RESPONSE_TIMEOUT = 3000;
    private static final int TIMEOUT_MILLIS = 1000;
    private static final int POLL_DELAY_MILLIS = 200;
    private static Latch latch;
    private HttpClientConfiguration.Builder clientBuilder = new HttpClientConfiguration.Builder().setName("streaming-test");
    private PollingProber pollingProber = new PollingProber(1000L, 200L);

    public HttpClientStreamingTestCase(String serviceToLoad) {
        super(serviceToLoad);
    }

    @Before
    public void createLatch() {
        latch = new Latch();
    }

    @Test
    @Description(value="Uses a streaming HTTP client to send a non blocking request which will finish before the stream is released.")
    public void nonBlockingStreaming() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setResponseBufferSize(DataUnit.KB.toBytes(10)).build());
        client.start();
        Reference responseReference = new Reference();
        try {
            client.sendAsync(this.getRequest(), 3000, true, null).whenComplete((response, exception) -> {
                HttpResponse cfr_ignored_0 = (HttpResponse)responseReference.set(response);
            });
            this.pollingProber.check((Probe)new ResponseReceivedProbe((Reference<HttpResponse>)responseReference));
            this.verifyStreamed((HttpResponse)responseReference.get());
        }
        finally {
            client.stop();
        }
    }

    @Test
    @Description(value="Uses a non streaming HTTP client to send a non blocking request which will not finish until the stream is released.")
    public void nonBlockingMemory() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setStreaming(false).build());
        client.start();
        Reference responseReference = new Reference();
        try {
            client.sendAsync(this.getRequest(), 3000, true, null).whenComplete((response, exception) -> {
                HttpResponse cfr_ignored_0 = (HttpResponse)responseReference.set(response);
            });
            this.verifyNotStreamed((Reference<HttpResponse>)responseReference);
        }
        finally {
            client.stop();
        }
    }

    @Test
    @Description(value="Uses a streaming HTTP client to send a blocking request which will finish before the stream is released.")
    public void blockingStreaming() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.build());
        client.start();
        try {
            HttpResponse response = client.send(this.getRequest(), 3000, true, null);
            this.verifyStreamed(response);
        }
        finally {
            client.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Uses a non streaming HTTP client to send a request which will not finish until the stream is released.")
    public void blockingMemory() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setStreaming(false).build());
        client.start();
        Reference responseReference = new Reference();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try {
            executorService.execute(() -> {
                try {
                    responseReference.set((Object)client.send(this.getRequest(), 3000, true, null));
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            this.verifyNotStreamed((Reference<HttpResponse>)responseReference);
        }
        finally {
            executorService.shutdown();
            client.stop();
        }
    }

    private HttpRequest getRequest() {
        return HttpRequest.builder().uri(this.getUri()).build();
    }

    private void verifyStreamed(HttpResponse response) throws IOException {
        Assert.assertThat((Object)response.getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.OK.getStatusCode()));
        latch.release();
        this.verifyBody(response);
    }

    private void verifyNotStreamed(Reference<HttpResponse> responseReference) throws IOException {
        Assert.assertThat((Object)responseReference.get(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        latch.release();
        this.pollingProber.check((Probe)new ResponseReceivedProbe(responseReference));
        Assert.assertThat((Object)((HttpResponse)responseReference.get()).getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.OK.getStatusCode()));
        this.verifyBody((HttpResponse)responseReference.get());
    }

    private void verifyBody(HttpResponse response) throws IOException {
        Assert.assertThat((Object)IOUtils.toString((InputStream)response.getEntity().getContent()).length(), (Matcher)Matchers.is((Object)14336));
    }

    @Override
    protected HttpResponse setUpHttpResponse(HttpRequest request) {
        return ((HttpResponseBuilder)HttpResponse.builder().statusCode(Integer.valueOf(HttpConstants.HttpStatus.OK.getStatusCode())).reasonPhrase(HttpConstants.HttpStatus.OK.getReasonPhrase()).entity((HttpEntity)new InputStreamHttpEntity((InputStream)new FillAndWaitStream(latch)))).build();
    }

    private class ResponseReceivedProbe
    implements Probe {
        private Reference<HttpResponse> responseReference;

        public ResponseReceivedProbe(Reference<HttpResponse> responseReference) {
            this.responseReference = responseReference;
        }

        public boolean isSatisfied() {
            return this.responseReference.get() != null;
        }

        public String describeFailure() {
            return "Response was not received.";
        }
    }
}

