/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.slice.Slice;
import io.airlift.testing.TestingTicker;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.FeaturesConfig;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.buffer.PageDeserializer;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.execution.buffer.TestingPagesSerdeFactory;
import io.trino.operator.HttpPageBufferClient;
import io.trino.operator.MockExchangeRequestProcessor;
import io.trino.operator.PageBufferClientStatus;
import io.trino.operator.PageTooLargeException;
import io.trino.operator.PageTransportErrorException;
import io.trino.operator.PageTransportTimeoutException;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.HostAddress;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Execution(value=ExecutionMode.CONCURRENT)
public class TestHttpPageBufferClient {
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private static final TaskId TASK_ID = new TaskId(new StageId("query", 0), 0, 0);

    @BeforeAll
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-%s")));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
    }

    @AfterAll
    public void tearDown() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
            this.scheduler = null;
        }
        if (this.pageBufferClientCallbackExecutor != null) {
            this.pageBufferClientCallbackExecutor.shutdownNow();
            this.pageBufferClientCallbackExecutor = null;
        }
    }

    @Test
    public void testHappyPath() throws Exception {
        Page expectedPage = new Page(100);
        DataSize expectedMaxSize = DataSize.of((long)11L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(expectedMaxSize);
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient("localhost", (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), FeaturesConfig.DataIntegrityVerification.ABORT, expectedMaxSize, new Duration(1.0, TimeUnit.MINUTES), true, TASK_ID, location, (HttpPageBufferClient.ClientCallback)callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        processor.addPage(location, expectedPage);
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(1);
        TestHttpPageBufferClient.assertPageEquals(expectedPage, callback.getPages().get(0));
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(1);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 1, 1, 1, 0, "not scheduled");
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(0);
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(1);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 1, 2, 2, 0, "not scheduled");
        processor.addPage(location, expectedPage);
        processor.addPage(location, expectedPage);
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(2);
        TestHttpPageBufferClient.assertPageEquals(expectedPage, callback.getPages().get(0));
        TestHttpPageBufferClient.assertPageEquals(expectedPage, callback.getPages().get(1));
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(1);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        Assertions.assertThat((int)callback.getFailedBuffers()).isEqualTo(0);
        callback.resetStats();
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 3, 3, 3, 0, "not scheduled");
        callback.resetStats();
        processor.setComplete(location);
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(0);
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(1);
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(1);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(0);
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(0);
        Assertions.assertThat((int)callback.getFailedBuffers()).isEqualTo(0);
        TestHttpPageBufferClient.assertStatus(client, location, "closed", 3, 5, 5, 0, "not scheduled");
    }

    @Test
    public void testLifecycle() throws Exception {
        CyclicBarrier beforeRequest = new CyclicBarrier(2);
        CyclicBarrier afterRequest = new CyclicBarrier(2);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequest, afterRequest);
        processor.setResponse((Response)new TestingResponse(HttpStatus.NO_CONTENT, (ListMultimap)ImmutableListMultimap.of(), new byte[0]));
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient("localhost", (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE), new Duration(1.0, TimeUnit.MINUTES), true, TASK_ID, location, (HttpPageBufferClient.ClientCallback)callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest();
        beforeRequest.await(10L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "running", 0, 1, 0, 0, "PROCESSING_REQUEST");
        Assertions.assertThat((boolean)client.isRunning()).isEqualTo(true);
        afterRequest.await(10L, TimeUnit.SECONDS);
        requestComplete.await(10L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 1, 1, 1, "not scheduled");
        client.close();
        beforeRequest.await(10L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "closed", 0, 1, 1, 1, "PROCESSING_REQUEST");
        afterRequest.await(10L, TimeUnit.SECONDS);
        requestComplete.await(10L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "closed", 0, 1, 2, 1, "not scheduled");
    }

    @Test
    public void testInvalidResponses() throws Exception {
        CyclicBarrier beforeRequest = new CyclicBarrier(1);
        CyclicBarrier afterRequest = new CyclicBarrier(1);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequest, afterRequest);
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient("localhost", (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE), new Duration(1.0, TimeUnit.MINUTES), true, TASK_ID, location, (HttpPageBufferClient.ClientCallback)callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        processor.setResponse((Response)new TestingResponse(HttpStatus.NOT_FOUND, (ListMultimap)ImmutableListMultimap.of((Object)"Content-Type", (Object)"application/X-trino-pages"), new byte[0]));
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(0);
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(1);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        Assertions.assertThat((int)callback.getFailedBuffers()).isEqualTo(1);
        io.airlift.testing.Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportErrorException.class);
        io.airlift.testing.Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"Expected response code to be 200, but was 404");
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 1, 1, 1, "not scheduled");
        callback.resetStats();
        processor.setResponse((Response)new TestingResponse(HttpStatus.OK, (ListMultimap)ImmutableListMultimap.of((Object)"Content-Type", (Object)"INVALID_TYPE"), new byte[0]));
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(0);
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(1);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        Assertions.assertThat((int)callback.getFailedBuffers()).isEqualTo(1);
        io.airlift.testing.Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportErrorException.class);
        io.airlift.testing.Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"Expected application/x-trino-pages response from server but got INVALID_TYPE");
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 2, 2, 2, "not scheduled");
        callback.resetStats();
        processor.setResponse((Response)new TestingResponse(HttpStatus.OK, (ListMultimap)ImmutableListMultimap.of((Object)"Content-Type", (Object)"text/plain"), new byte[0]));
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(0);
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(1);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        Assertions.assertThat((int)callback.getFailedBuffers()).isEqualTo(1);
        io.airlift.testing.Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportErrorException.class);
        io.airlift.testing.Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"Expected application/x-trino-pages response from server but got text/plain");
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 3, 3, 3, "not scheduled");
        processor.setResponse((Response)new TestingResponse(HttpStatus.NO_CONTENT, (ListMultimap)ImmutableListMultimap.of(), new byte[0]));
        client.close();
        requestComplete.await(10L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "closed", 0, 3, 4, 3, "not scheduled");
    }

    @Test
    public void testCloseDuringPendingRequest() throws Exception {
        CyclicBarrier beforeRequest = new CyclicBarrier(2);
        CyclicBarrier afterRequest = new CyclicBarrier(2);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequest, afterRequest);
        processor.setResponse((Response)new TestingResponse(HttpStatus.NO_CONTENT, (ListMultimap)ImmutableListMultimap.of(), new byte[0]));
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient("localhost", (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE), new Duration(1.0, TimeUnit.MINUTES), true, TASK_ID, location, (HttpPageBufferClient.ClientCallback)callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest();
        beforeRequest.await(10L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "running", 0, 1, 0, 0, "PROCESSING_REQUEST");
        Assertions.assertThat((boolean)client.isRunning()).isEqualTo(true);
        client.close();
        try {
            requestComplete.await(10L, TimeUnit.SECONDS);
        }
        catch (BrokenBarrierException brokenBarrierException) {
            // empty catch block
        }
        try {
            afterRequest.await(10L, TimeUnit.SECONDS);
        }
        catch (BrokenBarrierException ignored) {
            afterRequest.reset();
        }
        beforeRequest.await(10L, TimeUnit.SECONDS);
        afterRequest.await(10L, TimeUnit.SECONDS);
        requestComplete.await(10L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "closed", 0, 1, 2, 1, "not scheduled");
    }

    @Test
    public void testExceptionFromResponseHandler() throws Exception {
        TestingTicker ticker = new TestingTicker();
        AtomicReference<Duration> tickerIncrement = new AtomicReference<Duration>(new Duration(0.0, TimeUnit.SECONDS));
        TestingHttpClient.Processor processor = input -> {
            Duration delta = (Duration)tickerIncrement.get();
            ticker.increment(delta.toMillis(), TimeUnit.MILLISECONDS);
            throw new RuntimeException("Foo");
        };
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient("localhost", (HttpClient)new TestingHttpClient(processor, (ExecutorService)this.scheduler), FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE), new Duration(30.0, TimeUnit.SECONDS), true, TASK_ID, location, (HttpPageBufferClient.ClientCallback)callback, this.scheduler, (Ticker)ticker, (Executor)this.pageBufferClientCallbackExecutor);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(0);
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(1);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        Assertions.assertThat((int)callback.getFailedBuffers()).isEqualTo(0);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 1, 1, 1, "not scheduled");
        tickerIncrement.set(new Duration(30.0, TimeUnit.SECONDS));
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(0);
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(2);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        Assertions.assertThat((int)callback.getFailedBuffers()).isEqualTo(0);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 2, 2, 2, "not scheduled");
        tickerIncrement.set(new Duration(31.0, TimeUnit.SECONDS));
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((int)callback.getPages().size()).isEqualTo(0);
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(3);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        Assertions.assertThat((int)callback.getFailedBuffers()).isEqualTo(1);
        io.airlift.testing.Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportTimeoutException.class);
        io.airlift.testing.Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"Encountered too many errors talking to a worker node. The node may have crashed or be under too much load. This is probably a transient issue, so please retry your query in a few minutes. (http://localhost:8080/0 - 3 failures, failure duration 31.00s, total failed request time 31.00s)");
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 3, 3, 3, "not scheduled");
    }

    @Test
    public void testErrorCodes() {
        Assertions.assertThat((Object)new PageTooLargeException().getErrorCode()).isEqualTo((Object)StandardErrorCode.PAGE_TOO_LARGE.toErrorCode());
        Assertions.assertThat((Object)new PageTransportErrorException(HostAddress.fromParts((String)"127.0.0.1", (int)8080), "").getErrorCode()).isEqualTo((Object)StandardErrorCode.PAGE_TRANSPORT_ERROR.toErrorCode());
        Assertions.assertThat((Object)new PageTransportTimeoutException(HostAddress.fromParts((String)"127.0.0.1", (int)8080), "", null).getErrorCode()).isEqualTo((Object)StandardErrorCode.PAGE_TRANSPORT_TIMEOUT.toErrorCode());
    }

    @Test
    public void testAverageSizeOfRequest() {
        HttpPageBufferClient client = new HttpPageBufferClient("localhost", (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)new MockExchangeRequestProcessor(DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), (ExecutorService)this.scheduler), FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE), new Duration(30.0, TimeUnit.SECONDS), true, TASK_ID, URI.create("http://localhost:8080"), (HttpPageBufferClient.ClientCallback)new TestingClientCallback(new CyclicBarrier(1)), this.scheduler, (Ticker)new TestingTicker(), (Executor)this.pageBufferClientCallbackExecutor);
        Assertions.assertThat((long)client.getAverageRequestSizeInBytes()).isEqualTo(0L);
        client.requestSucceeded(0L);
        Assertions.assertThat((long)client.getAverageRequestSizeInBytes()).isEqualTo(0L);
        client.requestSucceeded(1000L);
        client.requestSucceeded(800L);
        Assertions.assertThat((long)client.getAverageRequestSizeInBytes()).isEqualTo(600L);
    }

    @Test
    public void testMemoryExceededInAddPages() throws Exception {
        URI location = URI.create("http://localhost:8080");
        Page page = new Page(new Block[]{RunLengthEncodedBlock.create((Type)BigintType.BIGINT, (Object)1L, (int)100)});
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        final TrinoException expectedException = new TrinoException((ErrorCodeSupplier)StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT, "Memory limit exceeded");
        final AtomicBoolean addPagesCalled = new AtomicBoolean(false);
        TestingClientCallback callback = new TestingClientCallback(requestComplete){

            @Override
            public boolean addPages(HttpPageBufferClient client, List<Slice> pages) {
                addPagesCalled.set(true);
                throw expectedException;
            }
        };
        HttpPageBufferClient client = new HttpPageBufferClient("localhost", (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE), new Duration(30.0, TimeUnit.SECONDS), true, TASK_ID, location, (HttpPageBufferClient.ClientCallback)callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
        processor.addPage(location, page);
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((boolean)addPagesCalled.get()).isTrue();
        Assertions.assertThat((int)callback.getCompletedRequests()).isEqualTo(1);
        Assertions.assertThat((int)callback.getFinishedBuffers()).isEqualTo(0);
        Assertions.assertThat((int)callback.getFailedBuffers()).isEqualTo(1);
        Assertions.assertThat((Throwable)callback.getFailure()).isEqualTo((Object)expectedException);
    }

    private static void assertStatus(HttpPageBufferClient client, URI location, String status, int pagesReceived, int requestsScheduled, int requestsCompleted, int requestsFailed, String httpRequestState) {
        PageBufferClientStatus actualStatus = client.getStatus();
        Assertions.assertThat((URI)actualStatus.getUri()).isEqualTo((Object)location);
        ((AbstractStringAssert)Assertions.assertThat((String)actualStatus.getState()).describedAs("status", new Object[0])).isEqualTo(status);
        ((AbstractIntegerAssert)Assertions.assertThat((int)actualStatus.getPagesReceived()).describedAs("pagesReceived", new Object[0])).isEqualTo(pagesReceived);
        ((AbstractIntegerAssert)Assertions.assertThat((int)actualStatus.getRequestsScheduled()).describedAs("requestsScheduled", new Object[0])).isEqualTo(requestsScheduled);
        ((AbstractIntegerAssert)Assertions.assertThat((int)actualStatus.getRequestsCompleted()).describedAs("requestsCompleted", new Object[0])).isEqualTo(requestsCompleted);
        ((AbstractIntegerAssert)Assertions.assertThat((int)actualStatus.getRequestsFailed()).describedAs("requestsFailed", new Object[0])).isEqualTo(requestsFailed);
        ((AbstractStringAssert)Assertions.assertThat((String)actualStatus.getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo(httpRequestState);
    }

    private static void assertPageEquals(Page expectedPage, Page actualPage) {
        Assertions.assertThat((int)actualPage.getPositionCount()).isEqualTo(expectedPage.getPositionCount());
        Assertions.assertThat((int)actualPage.getChannelCount()).isEqualTo(expectedPage.getChannelCount());
    }

    private static class TestingClientCallback
    implements HttpPageBufferClient.ClientCallback {
        private final PagesSerdeFactory serdeFactory = new TestingPagesSerdeFactory();
        private final CyclicBarrier done;
        private final List<Slice> pages = Collections.synchronizedList(new ArrayList());
        private final AtomicInteger completedRequests = new AtomicInteger();
        private final AtomicInteger finishedBuffers = new AtomicInteger();
        private final AtomicInteger failedBuffers = new AtomicInteger();
        private final AtomicReference<Throwable> failure = new AtomicReference();

        public TestingClientCallback(CyclicBarrier done) {
            this.done = done;
        }

        public List<Page> getPages() {
            PageDeserializer deserializer = this.serdeFactory.createDeserializer(Optional.empty());
            return this.pages.stream().map(arg_0 -> ((PageDeserializer)deserializer).deserialize(arg_0)).collect(Collectors.toList());
        }

        private int getCompletedRequests() {
            return this.completedRequests.get();
        }

        private int getFinishedBuffers() {
            return this.finishedBuffers.get();
        }

        public int getFailedBuffers() {
            return this.failedBuffers.get();
        }

        public Throwable getFailure() {
            return this.failure.get();
        }

        public boolean addPages(HttpPageBufferClient client, List<Slice> pages) {
            this.pages.addAll(pages);
            return true;
        }

        public void requestComplete(HttpPageBufferClient client) {
            this.completedRequests.getAndIncrement();
            this.awaitDone();
        }

        public void clientFinished(HttpPageBufferClient client) {
            this.finishedBuffers.getAndIncrement();
            this.awaitDone();
        }

        public void clientFailed(HttpPageBufferClient client, Throwable cause) {
            this.failedBuffers.getAndIncrement();
            this.failure.compareAndSet(null, cause);
        }

        public void resetStats() {
            this.pages.clear();
            this.completedRequests.set(0);
            this.finishedBuffers.set(0);
            this.failedBuffers.set(0);
            this.failure.set(null);
        }

        private void awaitDone() {
            try {
                this.done.await(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (BrokenBarrierException | TimeoutException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class StaticRequestProcessor
    implements TestingHttpClient.Processor {
        private final AtomicReference<Response> response = new AtomicReference();
        private final CyclicBarrier beforeRequest;
        private final CyclicBarrier afterRequest;

        private StaticRequestProcessor(CyclicBarrier beforeRequest, CyclicBarrier afterRequest) {
            this.beforeRequest = beforeRequest;
            this.afterRequest = afterRequest;
        }

        private void setResponse(Response response) {
            this.response.set(response);
        }

        public Response handle(Request request) throws Exception {
            this.beforeRequest.await(10L, TimeUnit.SECONDS);
            try {
                Response response = this.response.get();
                return response;
            }
            finally {
                this.afterRequest.await(10L, TimeUnit.SECONDS);
            }
        }
    }
}

