/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.operator;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.Response;
import com.facebook.airlift.http.client.testing.TestingHttpClient;
import com.facebook.airlift.testing.Assertions;
import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.buffer.TestingPagesSerdeFactory;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.memory.context.SimpleLocalMemoryContext;
import com.facebook.presto.operator.ExchangeClient;
import com.facebook.presto.operator.ExchangeClientStatus;
import com.facebook.presto.operator.MockExchangeRequestProcessor;
import com.facebook.presto.operator.PageBufferClientStatus;
import com.facebook.presto.operator.TestingDriftClient;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestExchangeClient {
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private ExecutorService testingHttpClientExecutor;
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();

    @BeforeClass
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)"test-%s"));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
        this.testingHttpClientExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"test-%s"));
    }

    @AfterClass(alwaysRun=true)
    public void tearDown() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
            this.scheduler = null;
        }
        if (this.pageBufferClientCallbackExecutor != null) {
            this.pageBufferClientCallbackExecutor.shutdownNow();
            this.pageBufferClientCallbackExecutor = null;
        }
        if (this.testingHttpClientExecutor != null) {
            this.testingHttpClientExecutor.shutdownNow();
            this.testingHttpClientExecutor = null;
        }
    }

    @Test
    public void testHappyPath() {
        this.testHappyPath(false, in -> in);
    }

    @Test
    public void testHappyPathChecksum() {
        this.testHappyPath(true, in -> in);
    }

    @Test(expectedExceptions={PrestoException.class}, expectedExceptionsMessageRegExp="Received corrupted serialized page from host.*")
    public void testHappyPathChecksumFail() {
        this.testHappyPath(true, in -> {
            in[in.length - 1] = ~in[((byte[])in).length - 1];
            return in;
        });
    }

    private void testHappyPath(boolean checksum, Function<byte[], byte[]> dataChanger) {
        DataSize bufferCapacity = new DataSize(32.0, DataSize.Unit.MEGABYTE);
        DataSize maxResponseSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize, TestingPagesSerdeFactory.testingPagesSerde(checksum), dataChanger);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        processor.setComplete(location);
        ExchangeClient exchangeClient = this.createExchangeClient(processor, bufferCapacity, maxResponseSize);
        exchangeClient.addLocation(location, TaskId.valueOf((String)"queryid.0.0.0.0"));
        exchangeClient.noMoreLocations();
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(2));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(3));
        Assert.assertNull((Object)TestExchangeClient.getNextPage(exchangeClient));
        Assert.assertTrue((boolean)exchangeClient.isClosed());
        ExchangeClientStatus status = exchangeClient.getStatus();
        Assert.assertEquals((int)status.getBufferedPages(), (int)0);
        Assert.assertEquals((long)status.getBufferedBytes(), (long)0L);
        TestExchangeClient.assertStatus((PageBufferClientStatus)status.getPageBufferClientStatuses().get(0), location, "closed", 3, 3, 3, "not scheduled");
    }

    @Test(timeOut=10000L)
    public void testAddLocation() throws Exception {
        DataSize bufferCapacity = new DataSize(32.0, DataSize.Unit.MEGABYTE);
        DataSize maxResponseSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        ExchangeClient exchangeClient = this.createExchangeClient(processor, bufferCapacity, maxResponseSize);
        URI location1 = URI.create("http://localhost:8081/foo");
        processor.addPage(location1, TestExchangeClient.createPage(1));
        processor.addPage(location1, TestExchangeClient.createPage(2));
        processor.addPage(location1, TestExchangeClient.createPage(3));
        processor.setComplete(location1);
        exchangeClient.addLocation(location1, TaskId.valueOf((String)"foo.0.0.0.0"));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(2));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(3));
        Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        URI location2 = URI.create("http://localhost:8082/bar");
        processor.addPage(location2, TestExchangeClient.createPage(4));
        processor.addPage(location2, TestExchangeClient.createPage(5));
        processor.addPage(location2, TestExchangeClient.createPage(6));
        processor.setComplete(location2);
        exchangeClient.addLocation(location2, TaskId.valueOf((String)"bar.0.0.0.0"));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(4));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(5));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(6));
        Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        exchangeClient.noMoreLocations();
        while (!exchangeClient.isClosed()) {
            Thread.sleep(1L);
        }
        ImmutableMap statuses = Maps.uniqueIndex((Iterable)exchangeClient.getStatus().getPageBufferClientStatuses(), PageBufferClientStatus::getUri);
        TestExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location1), location1, "closed", 3, 3, 3, "not scheduled");
        TestExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location2), location2, "closed", 3, 3, 3, "not scheduled");
    }

    @Test
    public void testBufferLimit() {
        DataSize bufferCapacity = new DataSize(1.0, DataSize.Unit.BYTE);
        DataSize maxResponseSize = new DataSize(1.0, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        processor.setComplete(location);
        ExchangeClient exchangeClient = this.createExchangeClient(processor, bufferCapacity, maxResponseSize);
        exchangeClient.addLocation(location, TaskId.valueOf((String)"taskid.0.0.0.0"));
        exchangeClient.noMoreLocations();
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        long start = System.nanoTime();
        exchangeClient.scheduleRequestIfNecessary();
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 1, 1, 1, "not scheduled");
        TestExchangeClient.assertPageEquals(exchangeClient.pollPage(), TestExchangeClient.createPage(1));
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 2, 2, 2, "not scheduled");
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertPageEquals(exchangeClient.pollPage(), TestExchangeClient.createPage(2));
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 3, 3, 3, "not scheduled");
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(3));
        Assert.assertNull((Object)TestExchangeClient.getNextPage(exchangeClient));
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        Assert.assertEquals((long)exchangeClient.getStatus().getBufferedBytes(), (long)0L);
        Assert.assertTrue((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "closed", 3, 5, 5, "not scheduled");
    }

    @Test
    public void testClose() throws Exception {
        DataSize bufferCapacity = new DataSize(1.0, DataSize.Unit.BYTE);
        DataSize maxResponseSize = new DataSize(1.0, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        ExchangeClient exchangeClient = this.createExchangeClient(processor, bufferCapacity, maxResponseSize);
        exchangeClient.addLocation(location, TaskId.valueOf((String)"taskid.0.0.0.0"));
        exchangeClient.noMoreLocations();
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        exchangeClient.close();
        this.waitUntilEquals(() -> ((ExchangeClient)exchangeClient).isFinished(), true, new Duration(5.0, TimeUnit.SECONDS));
        Assert.assertTrue((boolean)exchangeClient.isClosed());
        Assert.assertNull((Object)exchangeClient.pollPage());
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        Assert.assertEquals((long)exchangeClient.getStatus().getBufferedBytes(), (long)0L);
        Optional<PageBufferClientStatus> clientStatusOptional = exchangeClient.getStatus().getPageBufferClientStatuses().stream().filter(pageBufferClientStatus -> pageBufferClientStatus.getUri().equals(location)).findFirst();
        Assert.assertTrue((boolean)clientStatusOptional.isPresent());
        TestExchangeClient.assertStatus(clientStatusOptional.get(), "closed", "not scheduled");
    }

    @Test
    public void testInitialRequestLimit() {
        DataSize bufferCapacity = new DataSize(16.0, DataSize.Unit.MEGABYTE);
        DataSize maxResponseSize = new DataSize(1048576.0, DataSize.Unit.BYTE);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize){

            @Override
            public Response handle(Request request) {
                if (!Uninterruptibles.awaitUninterruptibly((CountDownLatch)countDownLatch, (long)10L, (TimeUnit)TimeUnit.SECONDS)) {
                    throw new UncheckedTimeoutException();
                }
                return super.handle(request);
            }
        };
        ArrayList<URI> locations = new ArrayList<URI>();
        int numLocations = 16;
        ArrayList<DataSize> expectedMaxSizes = new ArrayList<DataSize>();
        for (int i = 0; i < numLocations; ++i) {
            URI location = URI.create("http://localhost:" + (8080 + i));
            locations.add(location);
            processor.addPage(location, TestExchangeClient.createPage(0x100000));
            processor.addPage(location, TestExchangeClient.createPage(0x100000));
            processor.addPage(location, TestExchangeClient.createPage(0x100000));
            processor.setComplete(location);
            expectedMaxSizes.add(maxResponseSize);
        }
        try (ExchangeClient exchangeClient = this.createExchangeClient(processor, bufferCapacity, maxResponseSize);){
            int i;
            for (int i2 = 0; i2 < numLocations; ++i2) {
                exchangeClient.addLocation((URI)locations.get(i2), TaskId.valueOf((String)("taskid.0.0." + i2 + ".0")));
            }
            exchangeClient.noMoreLocations();
            Assert.assertFalse((boolean)exchangeClient.isClosed());
            long start = System.nanoTime();
            countDownLatch.countDown();
            do {
                Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
                Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
            } while (exchangeClient.getStatus().getBufferedPages() < 16);
            Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)16);
            Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
            List pageBufferClientStatuses = exchangeClient.getStatus().getPageBufferClientStatuses();
            Assert.assertEquals((int)16, (int)pageBufferClientStatuses.stream().filter(status -> status.getPagesReceived() == 1).mapToInt(PageBufferClientStatus::getPagesReceived).sum());
            Assert.assertEquals(processor.getRequestMaxSizes(), expectedMaxSizes);
            for (i = 0; i < numLocations * 3; ++i) {
                Assert.assertNotNull((Object)TestExchangeClient.getNextPage(exchangeClient));
            }
            do {
                Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(10.0, TimeUnit.SECONDS));
                Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
            } while (processor.getRequestMaxSizes().size() < 64);
            for (i = 0; i < 48; ++i) {
                expectedMaxSizes.add(maxResponseSize);
            }
            Assert.assertEquals(processor.getRequestMaxSizes(), expectedMaxSizes);
        }
    }

    @Test
    public void testRemoveRemoteSource() throws Exception {
        DataSize bufferCapacity = new DataSize(1.0, DataSize.Unit.BYTE);
        DataSize maxResponseSize = new DataSize(1.0, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location1 = URI.create("http://localhost:8081/foo.0.0.0.0");
        TaskId taskId1 = TaskId.valueOf((String)"foo.0.0.0.0");
        URI location2 = URI.create("http://localhost:8082/bar.0.0.0.0");
        TaskId taskId2 = TaskId.valueOf((String)"bar.0.0.0.0");
        processor.addPage(location1, TestExchangeClient.createPage(1));
        processor.addPage(location1, TestExchangeClient.createPage(2));
        processor.addPage(location1, TestExchangeClient.createPage(3));
        ExchangeClient exchangeClient = this.createExchangeClient(processor, bufferCapacity, maxResponseSize);
        exchangeClient.addLocation(location1, taskId1);
        exchangeClient.addLocation(location2, taskId2);
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        this.waitUntilEquals(() -> exchangeClient.getStatus().getBufferedPages(), 1, new Duration(5.0, TimeUnit.SECONDS));
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        exchangeClient.removeRemoteSource(taskId1);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        Assert.assertNull((Object)exchangeClient.pollPage());
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        processor.addPage(location2, TestExchangeClient.createPage(4));
        processor.addPage(location2, TestExchangeClient.createPage(5));
        processor.addPage(location2, TestExchangeClient.createPage(6));
        processor.setComplete(location2);
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(4));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(5));
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(6));
        Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        Assert.assertFalse((boolean)exchangeClient.isClosed());
        exchangeClient.noMoreLocations();
        while (!exchangeClient.isClosed()) {
            Thread.sleep(1L);
        }
        ExchangeClientStatus exchangeClientStatus = exchangeClient.getStatus();
        Optional<PageBufferClientStatus> clientStatusOptional1 = exchangeClientStatus.getPageBufferClientStatuses().stream().filter(pageBufferClientStatus -> pageBufferClientStatus.getUri().equals(location1)).findFirst();
        Assert.assertTrue((boolean)clientStatusOptional1.isPresent());
        TestExchangeClient.assertStatus(clientStatusOptional1.get(), "closed", "not scheduled");
        Optional<PageBufferClientStatus> clientStatusOptional2 = exchangeClientStatus.getPageBufferClientStatuses().stream().filter(pageBufferClientStatus -> pageBufferClientStatus.getUri().equals(location2)).findFirst();
        Assert.assertTrue((boolean)clientStatusOptional2.isPresent());
        TestExchangeClient.assertStatus(clientStatusOptional2.get(), "closed", "not scheduled");
    }

    private static Page createPage(int size) {
        return new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, size)});
    }

    private static SerializedPage getNextPage(ExchangeClient exchangeClient) {
        ListenableFuture futurePage = Futures.transform((ListenableFuture)exchangeClient.isBlocked(), ignored -> exchangeClient.pollPage(), (Executor)MoreExecutors.directExecutor());
        return MoreFutures.tryGetFutureValue((Future)futurePage, (int)100, (TimeUnit)TimeUnit.SECONDS).orElse(null);
    }

    private static void assertPageEquals(SerializedPage actualPage, Page expectedPage) {
        Assert.assertNotNull((Object)actualPage);
        Assert.assertEquals((int)actualPage.getPositionCount(), (int)expectedPage.getPositionCount());
        Assert.assertEquals((int)PAGES_SERDE.deserialize(actualPage).getChannelCount(), (int)expectedPage.getChannelCount());
    }

    private static void assertStatus(PageBufferClientStatus clientStatus, String status, String httpRequestState) {
        Assert.assertEquals((String)clientStatus.getState(), (String)status, (String)"status");
        Assert.assertEquals((String)clientStatus.getHttpRequestState(), (String)httpRequestState, (String)"httpRequestState");
    }

    private static void assertStatus(PageBufferClientStatus clientStatus, URI location, String status, int pagesReceived, int requestsScheduled, int requestsCompleted, String httpRequestState) {
        Assert.assertEquals((Object)clientStatus.getUri(), (Object)location);
        Assert.assertEquals((String)clientStatus.getState(), (String)status, (String)"status");
        Assert.assertEquals((int)clientStatus.getPagesReceived(), (int)pagesReceived, (String)"pagesReceived");
        Assert.assertEquals((int)clientStatus.getRequestsScheduled(), (int)requestsScheduled, (String)"requestsScheduled");
        Assert.assertEquals((int)clientStatus.getRequestsCompleted(), (int)requestsCompleted, (String)"requestsCompleted");
        Assert.assertEquals((String)clientStatus.getHttpRequestState(), (String)httpRequestState, (String)"httpRequestState");
    }

    private <T> void waitUntilEquals(Supplier<T> actualSupplier, T expected, Duration timeout) {
        long nanoUntil = System.nanoTime() + timeout.toMillis() * 1000000L;
        while (System.nanoTime() - nanoUntil < 0L) {
            if (expected.equals(actualSupplier.get())) {
                return;
            }
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException interruptedException) {}
        }
        Assert.assertEquals(actualSupplier.get(), expected);
    }

    private ExchangeClient createExchangeClient(MockExchangeRequestProcessor processor, DataSize bufferCapacity, DataSize maxResponseSize) {
        return new ExchangeClient(bufferCapacity, maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, 0.2, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, this.testingHttpClientExecutor), new TestingDriftClient(), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor);
    }
}

