/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.operator;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpStatus;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.Response;
import com.facebook.airlift.http.client.testing.TestingHttpClient;
import com.facebook.airlift.http.client.testing.TestingResponse;
import com.facebook.airlift.testing.Assertions;
import com.facebook.airlift.testing.TestingTicker;
import com.facebook.presto.common.Page;
import com.facebook.presto.execution.buffer.TestingPagesSerdeFactory;
import com.facebook.presto.operator.HttpRpcShuffleClient;
import com.facebook.presto.operator.MockExchangeRequestProcessor;
import com.facebook.presto.operator.PageBufferClient;
import com.facebook.presto.operator.PageBufferClientStatus;
import com.facebook.presto.operator.PageTooLargeException;
import com.facebook.presto.operator.PageTransportErrorException;
import com.facebook.presto.operator.PageTransportTimeoutException;
import com.facebook.presto.operator.RpcShuffleClient;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestPageBufferClient {
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();

    @BeforeClass
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)"test-%s"));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
    }

    @AfterClass(alwaysRun=true)
    public void tearDown() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
            this.scheduler = null;
        }
        if (this.pageBufferClientCallbackExecutor != null) {
            this.pageBufferClientCallbackExecutor.shutdownNow();
            this.pageBufferClientCallbackExecutor = null;
        }
    }

    @Test
    public void testHappyPath() throws Exception {
        Page expectedPage = new Page(100);
        DataSize expectedMaxSize = new DataSize(11.0, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(expectedMaxSize);
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        PageBufferClient client = new PageBufferClient((RpcShuffleClient)new HttpRpcShuffleClient((HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), location), new Duration(1.0, TimeUnit.MINUTES), true, location, Optional.empty(), (PageBufferClient.ClientCallback)callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        processor.addPage(location, expectedPage);
        callback.resetStats();
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)1);
        TestPageBufferClient.assertPageEquals(expectedPage, callback.getPages().get(0));
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        TestPageBufferClient.assertStatus(client, location, "queued", 1, 1, 1, 0, "not scheduled");
        callback.resetStats();
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        TestPageBufferClient.assertStatus(client, location, "queued", 1, 2, 2, 0, "not scheduled");
        processor.addPage(location, expectedPage);
        processor.addPage(location, expectedPage);
        callback.resetStats();
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)2);
        TestPageBufferClient.assertPageEquals(expectedPage, callback.getPages().get(0));
        TestPageBufferClient.assertPageEquals(expectedPage, callback.getPages().get(1));
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)0);
        callback.resetStats();
        TestPageBufferClient.assertStatus(client, location, "queued", 3, 3, 3, 0, "not scheduled");
        callback.resetStats();
        processor.setComplete(location);
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        callback.resetStats();
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)1);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)0);
        TestPageBufferClient.assertStatus(client, location, "closed", 3, 5, 5, 0, "not scheduled");
    }

    @Test
    public void testLifecycle() throws Exception {
        DataSize expectedMaxSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        CyclicBarrier beforeRequest = new CyclicBarrier(2);
        CyclicBarrier afterRequest = new CyclicBarrier(2);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequest, afterRequest);
        processor.setResponse((Response)new TestingResponse(HttpStatus.NO_CONTENT, (ListMultimap)ImmutableListMultimap.of(), new byte[0]));
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        PageBufferClient client = new PageBufferClient((RpcShuffleClient)new HttpRpcShuffleClient((HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), location), new Duration(1.0, TimeUnit.MINUTES), true, location, Optional.empty(), (PageBufferClient.ClientCallback)callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest(expectedMaxSize);
        beforeRequest.await(10L, TimeUnit.SECONDS);
        TestPageBufferClient.assertStatus(client, location, "running", 0, 1, 0, 0, "processing request");
        Assert.assertEquals((boolean)client.isRunning(), (boolean)true);
        afterRequest.await(10L, TimeUnit.SECONDS);
        requestComplete.await(10L, TimeUnit.SECONDS);
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 1, 1, 1, "not scheduled");
        client.close();
        beforeRequest.await(10L, TimeUnit.SECONDS);
        TestPageBufferClient.assertStatus(client, location, "closed", 0, 1, 1, 1, "processing request");
        afterRequest.await(10L, TimeUnit.SECONDS);
        requestComplete.await(10L, TimeUnit.SECONDS);
        TestPageBufferClient.assertStatus(client, location, "closed", 0, 1, 2, 1, "not scheduled");
    }

    @Test
    public void testInvalidResponses() throws Exception {
        DataSize expectedMaxSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        CyclicBarrier beforeRequest = new CyclicBarrier(1);
        CyclicBarrier afterRequest = new CyclicBarrier(1);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequest, afterRequest);
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        PageBufferClient client = new PageBufferClient((RpcShuffleClient)new HttpRpcShuffleClient((HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), location), new Duration(1.0, TimeUnit.MINUTES), true, location, Optional.empty(), (PageBufferClient.ClientCallback)callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        processor.setResponse((Response)new TestingResponse(HttpStatus.NOT_FOUND, (ListMultimap)ImmutableListMultimap.of((Object)"Content-Type", (Object)"application/X-presto-pages"), new byte[0]));
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)1);
        Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportErrorException.class);
        Assertions.assertContains((String)callback.getFailure().getCause().getMessage(), (String)"Expected response code to be 200, but was 404 Not Found");
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 1, 1, 1, "not scheduled");
        callback.resetStats();
        processor.setResponse((Response)new TestingResponse(HttpStatus.OK, (ListMultimap)ImmutableListMultimap.of((Object)"Content-Type", (Object)"INVALID_TYPE"), new byte[0]));
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)1);
        Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportErrorException.class);
        Assertions.assertContains((String)callback.getFailure().getCause().getMessage(), (String)"Expected application/x-presto-pages response from server but got INVALID_TYPE");
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 2, 2, 2, "not scheduled");
        callback.resetStats();
        processor.setResponse((Response)new TestingResponse(HttpStatus.OK, (ListMultimap)ImmutableListMultimap.of((Object)"Content-Type", (Object)"text/plain"), new byte[0]));
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)1);
        Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportErrorException.class);
        Assertions.assertContains((String)callback.getFailure().getCause().getMessage(), (String)"Expected application/x-presto-pages response from server but got text/plain");
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 3, 3, 3, "not scheduled");
        client.close();
        requestComplete.await(10L, TimeUnit.SECONDS);
        TestPageBufferClient.assertStatus(client, location, "closed", 0, 3, 4, 3, "not scheduled");
    }

    @Test
    public void testCloseDuringPendingRequest() throws Exception {
        DataSize expectedMaxSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        CyclicBarrier beforeRequest = new CyclicBarrier(2);
        CyclicBarrier afterRequest = new CyclicBarrier(2);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequest, afterRequest);
        processor.setResponse((Response)new TestingResponse(HttpStatus.NO_CONTENT, (ListMultimap)ImmutableListMultimap.of(), new byte[0]));
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        PageBufferClient client = new PageBufferClient((RpcShuffleClient)new HttpRpcShuffleClient((HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), location), new Duration(1.0, TimeUnit.MINUTES), true, location, Optional.empty(), (PageBufferClient.ClientCallback)callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest(expectedMaxSize);
        beforeRequest.await(10L, TimeUnit.SECONDS);
        TestPageBufferClient.assertStatus(client, location, "running", 0, 1, 0, 0, "processing request");
        Assert.assertEquals((boolean)client.isRunning(), (boolean)true);
        client.close();
        try {
            requestComplete.await(10L, TimeUnit.SECONDS);
        }
        catch (BrokenBarrierException brokenBarrierException) {
            // empty catch block
        }
        try {
            afterRequest.await(10L, TimeUnit.SECONDS);
        }
        catch (BrokenBarrierException ignored) {
            afterRequest.reset();
        }
        beforeRequest.await(10L, TimeUnit.SECONDS);
        afterRequest.await(10L, TimeUnit.SECONDS);
        requestComplete.await(10L, TimeUnit.SECONDS);
        TestPageBufferClient.assertStatus(client, location, "closed", 0, 1, 2, 1, "not scheduled");
    }

    @Test
    public void testExceptionFromResponseHandler() throws Exception {
        DataSize expectedMaxSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        TestingTicker ticker = new TestingTicker();
        AtomicReference<Duration> tickerIncrement = new AtomicReference<Duration>(new Duration(0.0, TimeUnit.SECONDS));
        TestingHttpClient.Processor processor = input -> {
            Duration delta = (Duration)tickerIncrement.get();
            ticker.increment(delta.toMillis(), TimeUnit.MILLISECONDS);
            throw new RuntimeException("Foo");
        };
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        PageBufferClient client = new PageBufferClient((RpcShuffleClient)new HttpRpcShuffleClient((HttpClient)new TestingHttpClient(processor, (ExecutorService)this.scheduler), location), new Duration(30.0, TimeUnit.SECONDS), true, location, Optional.empty(), (PageBufferClient.ClientCallback)callback, this.scheduler, (Ticker)ticker, (Executor)this.pageBufferClientCallbackExecutor);
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)0);
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 1, 1, 1, "not scheduled");
        tickerIncrement.set(new Duration(30.0, TimeUnit.SECONDS));
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)2);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)0);
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 2, 2, 2, "not scheduled");
        tickerIncrement.set(new Duration(31.0, TimeUnit.SECONDS));
        client.scheduleRequest(expectedMaxSize);
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)3);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)1);
        Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportTimeoutException.class);
        Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"Encountered too many errors talking to a worker node. The node may have crashed or be under too much load. This is probably a transient issue, so please retry your query in a few minutes. (http://localhost:8080/0 - 3 failures, failure duration 31.00s, total failed request time 31.00s)");
        TestPageBufferClient.assertStatus(client, location, "queued", 0, 3, 3, 3, "not scheduled");
    }

    @Test
    public void testErrorCodes() {
        Assert.assertEquals((Object)new PageTooLargeException(null).getErrorCode(), (Object)StandardErrorCode.PAGE_TOO_LARGE.toErrorCode());
        Assert.assertEquals((Object)new PageTransportErrorException(HostAddress.fromParts((String)"127.0.0.1", (int)8080), "").getErrorCode(), (Object)StandardErrorCode.PAGE_TRANSPORT_ERROR.toErrorCode());
        Assert.assertEquals((Object)new PageTransportTimeoutException(HostAddress.fromParts((String)"127.0.0.1", (int)8080), "", null).getErrorCode(), (Object)StandardErrorCode.PAGE_TRANSPORT_TIMEOUT.toErrorCode());
    }

    private static void assertStatus(PageBufferClient client, URI location, String status, int pagesReceived, int requestsScheduled, int requestsCompleted, int requestsFailed, String httpRequestState) {
        PageBufferClientStatus actualStatus = client.getStatus();
        Assert.assertEquals((Object)actualStatus.getUri(), (Object)location);
        Assert.assertEquals((String)actualStatus.getState(), (String)status, (String)"status");
        Assert.assertEquals((int)actualStatus.getPagesReceived(), (int)pagesReceived, (String)"pagesReceived");
        Assert.assertEquals((int)actualStatus.getRequestsScheduled(), (int)requestsScheduled, (String)"requestsScheduled");
        Assert.assertEquals((int)actualStatus.getRequestsCompleted(), (int)requestsCompleted, (String)"requestsCompleted");
        Assert.assertEquals((int)actualStatus.getRequestsFailed(), (int)requestsFailed, (String)"requestsFailed");
        Assert.assertEquals((String)actualStatus.getHttpRequestState(), (String)httpRequestState, (String)"httpRequestState");
    }

    private static void assertPageEquals(Page expectedPage, Page actualPage) {
        Assert.assertEquals((int)actualPage.getPositionCount(), (int)expectedPage.getPositionCount());
        Assert.assertEquals((int)actualPage.getChannelCount(), (int)expectedPage.getChannelCount());
    }

    private static class StaticRequestProcessor
    implements TestingHttpClient.Processor {
        private final AtomicReference<Response> response = new AtomicReference();
        private final CyclicBarrier beforeRequest;
        private final CyclicBarrier afterRequest;

        private StaticRequestProcessor(CyclicBarrier beforeRequest, CyclicBarrier afterRequest) {
            this.beforeRequest = beforeRequest;
            this.afterRequest = afterRequest;
        }

        private void setResponse(Response response) {
            this.response.set(response);
        }

        public Response handle(Request request) throws Exception {
            this.beforeRequest.await(10L, TimeUnit.SECONDS);
            try {
                Response response = this.response.get();
                return response;
            }
            finally {
                this.afterRequest.await(10L, TimeUnit.SECONDS);
            }
        }
    }

    private static class TestingClientCallback
    implements PageBufferClient.ClientCallback {
        private final CyclicBarrier done;
        private final List<SerializedPage> pages = Collections.synchronizedList(new ArrayList());
        private final AtomicInteger completedRequests = new AtomicInteger();
        private final AtomicInteger finishedBuffers = new AtomicInteger();
        private final AtomicInteger failedBuffers = new AtomicInteger();
        private final AtomicReference<Throwable> failure = new AtomicReference();

        public TestingClientCallback(CyclicBarrier done) {
            this.done = done;
        }

        public List<Page> getPages() {
            return this.pages.stream().map(arg_0 -> ((PagesSerde)PAGES_SERDE).deserialize(arg_0)).collect(Collectors.toList());
        }

        private int getCompletedRequests() {
            return this.completedRequests.get();
        }

        private int getFinishedBuffers() {
            return this.finishedBuffers.get();
        }

        public int getFailedBuffers() {
            return this.failedBuffers.get();
        }

        public Throwable getFailure() {
            return this.failure.get();
        }

        public boolean addPages(PageBufferClient client, List<SerializedPage> pages) {
            this.pages.addAll(pages);
            return true;
        }

        public void requestComplete(PageBufferClient client) {
            this.completedRequests.getAndIncrement();
            this.awaitDone();
        }

        public void clientFinished(PageBufferClient client) {
            this.finishedBuffers.getAndIncrement();
            this.awaitDone();
        }

        public void clientFailed(PageBufferClient client, Throwable cause) {
            this.failedBuffers.getAndIncrement();
            this.failure.compareAndSet(null, cause);
        }

        public void resetStats() {
            this.pages.clear();
            this.completedRequests.set(0);
            this.finishedBuffers.set(0);
            this.failedBuffers.set(0);
            this.failure.set(null);
        }

        private void awaitDone() {
            try {
                this.done.await(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (BrokenBarrierException | TimeoutException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

