/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.impl.service.client;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.core.api.util.concurrent.Latch;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.client.HttpClientConfiguration;
import org.mule.runtime.http.api.client.async.ResponseHandler;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.runtime.http.api.domain.entity.InputStreamHttpEntity;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.runtime.http.api.domain.message.response.HttpResponseBuilder;
import org.mule.runtime.http.api.server.HttpServer;
import org.mule.runtime.http.api.server.HttpServerConfiguration;
import org.mule.runtime.http.api.server.async.ResponseStatusCallback;
import org.mule.service.http.impl.service.HttpServiceImplementation;
import org.mule.tck.SimpleUnitTestSupportSchedulerService;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.junit4.rule.DynamicPort;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import ru.yandex.qatools.allure.annotations.Description;
import ru.yandex.qatools.allure.annotations.Features;
import ru.yandex.qatools.allure.annotations.Stories;

@Features(value={"HTTP Service"})
@Stories(value={"Streaming"})
@Description(value="Validates HTTP client behaviour against a streaming server.")
public class HttpClientStreamingTestCase
extends AbstractMuleTestCase {
    @Rule
    public DynamicPort serverPort = new DynamicPort("serverPort");
    private static final int RESPONSE_SIZE = 14336;
    private static final int WAIT_TIMEOUT = 5000;
    private static final int RESPONSE_TIMEOUT = 3000;
    private static final int TIMEOUT_MILLIS = 1000;
    private static final int POLL_DELAY_MILLIS = 200;
    private static Latch latch;
    private HttpServiceImplementation service = new HttpServiceImplementation((SchedulerService)new SimpleUnitTestSupportSchedulerService());
    private HttpClientConfiguration.Builder clientBuilder = new HttpClientConfiguration.Builder().setName("streaming-test");
    private PollingProber pollingProber = new PollingProber(1000L, 200L);
    private HttpServer server;

    @Before
    public void setUp() throws Exception {
        latch = new Latch();
        this.service.start();
        this.server = this.service.getServerFactory().create(new HttpServerConfiguration.Builder().setHost("localhost").setPort(this.serverPort.getNumber()).setName("streaming-test").build());
        this.server.start();
        this.server.addRequestHandler("/", (requestContext, responseCallback) -> responseCallback.responseReady(this.setUpHttpResponse(), (ResponseStatusCallback)new IgnoreResponseStatusCallback()));
    }

    @After
    public void tearDown() throws Exception {
        if (this.server != null) {
            this.server.stop();
            this.server.dispose();
        }
        this.service.stop();
    }

    @Test
    @Description(value="Uses a streaming HTTP client to send a non blocking request which will finish before the stream is released.")
    public void nonBlockingStreaming() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.build());
        client.start();
        final Reference responseReference = new Reference();
        try {
            client.send(this.getRequest(), 3000, true, null, new ResponseHandler(){

                public void onCompletion(HttpResponse response) {
                    responseReference.set((Object)response);
                }

                public void onFailure(Exception exception) {
                }
            });
            this.pollingProber.check((Probe)new ResponseReceivedProbe((Reference<HttpResponse>)responseReference));
            this.verifyStreamed((HttpResponse)responseReference.get());
        }
        finally {
            client.stop();
        }
    }

    @Test
    @Description(value="Uses a non streaming HTTP client to send a non blocking request which will not finish until the stream is released.")
    public void nonBlockingMemory() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setStreaming(false).build());
        client.start();
        final Reference responseReference = new Reference();
        try {
            client.send(this.getRequest(), 3000, true, null, new ResponseHandler(){

                public void onCompletion(HttpResponse response) {
                    responseReference.set((Object)response);
                }

                public void onFailure(Exception exception) {
                }
            });
            this.verifyNotStreamed((Reference<HttpResponse>)responseReference);
        }
        finally {
            client.stop();
        }
    }

    @Test
    @Description(value="Uses a streaming HTTP client to send a blocking request which will finish before the stream is released.")
    public void blockingStreaming() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.build());
        client.start();
        try {
            HttpResponse response = client.send(this.getRequest(), 3000, true, null);
            this.verifyStreamed(response);
        }
        finally {
            client.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Uses a non streaming HTTP client to send a request which will not finish until the stream is released.")
    public void blockingMemory() throws Exception {
        HttpClient client = this.service.getClientFactory().create(this.clientBuilder.setStreaming(false).build());
        client.start();
        Reference responseReference = new Reference();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try {
            executorService.execute(() -> {
                try {
                    responseReference.set((Object)client.send(this.getRequest(), 3000, true, null));
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            this.verifyNotStreamed((Reference<HttpResponse>)responseReference);
        }
        finally {
            executorService.shutdown();
            client.stop();
        }
    }

    private HttpRequest getRequest() {
        return HttpRequest.builder().setUri(this.getUrl()).build();
    }

    private void verifyStreamed(HttpResponse response) throws IOException {
        Assert.assertThat((Object)response.getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.OK.getStatusCode()));
        latch.release();
        this.verifyBody(response);
    }

    private void verifyNotStreamed(Reference<HttpResponse> responseReference) throws IOException {
        Assert.assertThat((Object)responseReference.get(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        latch.release();
        this.pollingProber.check((Probe)new ResponseReceivedProbe(responseReference));
        Assert.assertThat((Object)((HttpResponse)responseReference.get()).getStatusCode(), (Matcher)Matchers.is((Object)HttpConstants.HttpStatus.OK.getStatusCode()));
        this.verifyBody((HttpResponse)responseReference.get());
    }

    private void verifyBody(HttpResponse response) throws IOException {
        Assert.assertThat((Object)IOUtils.toString((InputStream)((InputStreamHttpEntity)response.getEntity()).getInputStream()).length(), (Matcher)Matchers.is((Object)14336));
    }

    private String getUrl() {
        return String.format("http://localhost:%s/", this.serverPort.getValue());
    }

    private HttpResponse setUpHttpResponse() {
        return ((HttpResponseBuilder)HttpResponse.builder().setStatusCode(Integer.valueOf(HttpConstants.HttpStatus.OK.getStatusCode())).setReasonPhrase(HttpConstants.HttpStatus.OK.getReasonPhrase()).setEntity((HttpEntity)new InputStreamHttpEntity((InputStream)new FillAndWaitStream()))).build();
    }

    private class ResponseReceivedProbe
    implements Probe {
        private Reference<HttpResponse> responseReference;

        public ResponseReceivedProbe(Reference<HttpResponse> responseReference) {
            this.responseReference = responseReference;
        }

        public boolean isSatisfied() {
            return this.responseReference.get() != null;
        }

        public String describeFailure() {
            return "Response was not received.";
        }
    }

    private class IgnoreResponseStatusCallback
    implements ResponseStatusCallback {
        private IgnoreResponseStatusCallback() {
        }

        public void responseSendFailure(Throwable throwable) {
        }

        public void responseSendSuccessfully() {
        }
    }

    private class FillAndWaitStream
    extends InputStream {
        private int sent = 0;

        private FillAndWaitStream() {
        }

        @Override
        public int read() throws IOException {
            if (this.sent < 14336) {
                ++this.sent;
                return 42;
            }
            try {
                latch.await(5000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            return -1;
        }
    }
}

