/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HeaderName;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.FeaturesConfig;
import io.trino.block.BlockAssertions;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.buffer.PagesSerde;
import io.trino.execution.buffer.TestingPagesSerdeFactory;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.memory.context.SimpleLocalMemoryContext;
import io.trino.operator.DeduplicationExchangeClientBuffer;
import io.trino.operator.ExchangeClient;
import io.trino.operator.ExchangeClientBuffer;
import io.trino.operator.ExchangeClientStatus;
import io.trino.operator.MockExchangeRequestProcessor;
import io.trino.operator.PageBufferClientStatus;
import io.trino.operator.RetryPolicy;
import io.trino.operator.StreamingExchangeClientBuffer;
import io.trino.operator.TestingExchangeClientBuffer;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.TrinoTransportException;
import io.trino.spi.block.Block;
import io.trino.testing.assertions.Assert;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestExchangeClient {
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();

    @BeforeClass
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-%s")));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
    }

    @AfterClass(alwaysRun=true)
    public void tearDown() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
            this.scheduler = null;
        }
        if (this.pageBufferClientCallbackExecutor != null) {
            this.pageBufferClientCallbackExecutor.shutdownNow();
            this.pageBufferClientCallbackExecutor = null;
        }
    }

    @Test
    public void testHappyPath() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        ImmutableList pages = ImmutableList.of((Object)TestExchangeClient.createSerializedPage(1), (Object)TestExchangeClient.createSerializedPage(2), (Object)TestExchangeClient.createSerializedPage(3));
        URI location = URI.create("http://localhost:8080");
        pages.forEach(page -> processor.addPage(location, (Slice)page));
        processor.setComplete(location);
        TestingExchangeClientBuffer buffer = new TestingExchangeClientBuffer(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        Assertions.assertThat(buffer.getAllTasks()).isEmpty();
        Assertions.assertThat((Map)buffer.getPages().asMap()).isEmpty();
        Assertions.assertThat(buffer.getFinishedTasks()).isEmpty();
        Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.testng.Assert.assertFalse((boolean)buffer.isNoMoreTasks());
        TaskId taskId2 = new TaskId(new StageId("query", 1), 0, 0);
        exchangeClient.addLocation(taskId2, location);
        Assertions.assertThat(buffer.getAllTasks()).containsExactly((Object[])new TaskId[]{taskId2});
        exchangeClient.noMoreLocations();
        org.testng.Assert.assertTrue((boolean)buffer.isNoMoreTasks());
        buffer.whenTaskFinished(taskId2).get(10L, TimeUnit.SECONDS);
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{taskId2});
        Assertions.assertThat((List)buffer.getPages().get((Object)taskId2)).hasSize(3);
        Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        buffer.setFinished(true);
        org.testng.Assert.assertTrue((boolean)exchangeClient.isFinished());
        ExchangeClientStatus status = exchangeClient.getStatus();
        org.testng.Assert.assertEquals((int)status.getBufferedPages(), (int)0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)status.getPageBufferClientStatuses().get(0), location, "closed", 3, 3, 3, "not scheduled");
        exchangeClient.close();
        Assert.assertEventually(() -> org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState"));
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{taskId2});
        Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        Assertions.assertThat((int)buffer.getPages().size()).isEqualTo(3);
    }

    @Test
    public void testStreamingHappyPath() {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        processor.setComplete(location);
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)new StreamingExchangeClientBuffer((Executor)this.scheduler, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location);
        exchangeClient.noMoreLocations();
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(2));
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(3));
        org.testng.Assert.assertNull((Object)TestExchangeClient.getNextPage(exchangeClient));
        org.testng.Assert.assertTrue((boolean)exchangeClient.isFinished());
        ExchangeClientStatus status = exchangeClient.getStatus();
        org.testng.Assert.assertEquals((int)status.getBufferedPages(), (int)0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)status.getPageBufferClientStatuses().get(0), location, "closed", 3, 3, 3, "not scheduled");
        exchangeClient.close();
    }

    @Test
    public void testAddLocation() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId task1 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId task2 = new TaskId(new StageId("query", 1), 1, 0);
        TaskId task3 = new TaskId(new StageId("query", 1), 2, 0);
        URI location1 = URI.create("http://localhost:8080/1");
        URI location2 = URI.create("http://localhost:8080/2");
        URI location3 = URI.create("http://localhost:8080/3");
        processor.addPage(location1, TestExchangeClient.createSerializedPage(1));
        processor.addPage(location1, TestExchangeClient.createSerializedPage(2));
        TestingExchangeClientBuffer buffer = new TestingExchangeClientBuffer(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        Assertions.assertThat(buffer.getAllTasks()).isEmpty();
        Assertions.assertThat((Map)buffer.getPages().asMap()).isEmpty();
        Assertions.assertThat(buffer.getFinishedTasks()).isEmpty();
        Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.testng.Assert.assertFalse((boolean)buffer.isNoMoreTasks());
        exchangeClient.addLocation(task1, location1);
        Assertions.assertThat(buffer.getAllTasks()).containsExactly((Object[])new TaskId[]{task1});
        TestExchangeClient.assertTaskIsNotFinished(buffer, task1);
        processor.setComplete(location1);
        buffer.whenTaskFinished(task1).get(10L, TimeUnit.SECONDS);
        Assertions.assertThat((List)buffer.getPages().get((Object)task1)).hasSize(2);
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{task1});
        exchangeClient.addLocation(task2, location2);
        Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2});
        TestExchangeClient.assertTaskIsNotFinished(buffer, task2);
        processor.setComplete(location2);
        buffer.whenTaskFinished(task2).get(10L, TimeUnit.SECONDS);
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2});
        Assertions.assertThat((List)buffer.getPages().get((Object)task2)).hasSize(0);
        exchangeClient.addLocation(task3, location3);
        Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3});
        TestExchangeClient.assertTaskIsNotFinished(buffer, task3);
        exchangeClient.noMoreLocations();
        org.testng.Assert.assertTrue((boolean)buffer.isNoMoreTasks());
        Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3});
        TestExchangeClient.assertTaskIsNotFinished(buffer, task3);
        exchangeClient.close();
        Assert.assertEventually(() -> org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState"));
        Assert.assertEventually(() -> org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(1)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState"));
        Assert.assertEventually(() -> org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(2)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState"));
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3});
        Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.testng.Assert.assertTrue((boolean)exchangeClient.isFinished());
    }

    @Test(timeOut=10000L)
    public void testStreamingAddLocation() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)new StreamingExchangeClientBuffer((Executor)this.scheduler, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-testAddLocation-%s")))), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        URI location1 = URI.create("http://localhost:8081/foo");
        processor.addPage(location1, TestExchangeClient.createPage(1));
        processor.addPage(location1, TestExchangeClient.createPage(2));
        processor.addPage(location1, TestExchangeClient.createPage(3));
        processor.setComplete(location1);
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location1);
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(2));
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(3));
        org.testng.Assert.assertNull((Object)exchangeClient.pollPage());
        ListenableFuture firstBlocked = exchangeClient.isBlocked();
        org.testng.Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)firstBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        org.testng.Assert.assertFalse((boolean)firstBlocked.isDone());
        org.testng.Assert.assertNull((Object)exchangeClient.pollPage());
        ListenableFuture secondBlocked = exchangeClient.isBlocked();
        org.testng.Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)secondBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        org.testng.Assert.assertFalse((boolean)secondBlocked.isDone());
        org.testng.Assert.assertNull((Object)exchangeClient.pollPage());
        ListenableFuture thirdBlocked = exchangeClient.isBlocked();
        org.testng.Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)thirdBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        org.testng.Assert.assertFalse((boolean)thirdBlocked.isDone());
        thirdBlocked.cancel(true);
        org.testng.Assert.assertTrue((boolean)thirdBlocked.isDone());
        org.testng.Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)firstBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        org.testng.Assert.assertFalse((boolean)firstBlocked.isDone());
        org.testng.Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)secondBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        org.testng.Assert.assertFalse((boolean)secondBlocked.isDone());
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        URI location2 = URI.create("http://localhost:8082/bar");
        processor.addPage(location2, TestExchangeClient.createPage(4));
        processor.addPage(location2, TestExchangeClient.createPage(5));
        processor.addPage(location2, TestExchangeClient.createPage(6));
        processor.setComplete(location2);
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 1, 0), location2);
        MoreFutures.tryGetFutureValue((Future)firstBlocked, (int)5, (TimeUnit)TimeUnit.SECONDS);
        org.testng.Assert.assertTrue((boolean)firstBlocked.isDone());
        MoreFutures.tryGetFutureValue((Future)secondBlocked, (int)5, (TimeUnit)TimeUnit.SECONDS);
        org.testng.Assert.assertTrue((boolean)secondBlocked.isDone());
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(4));
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(5));
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(6));
        org.testng.Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        exchangeClient.noMoreLocations();
        while (!exchangeClient.isFinished()) {
            Thread.sleep(1L);
        }
        exchangeClient.close();
        ImmutableMap statuses = Maps.uniqueIndex((Iterable)exchangeClient.getStatus().getPageBufferClientStatuses(), PageBufferClientStatus::getUri);
        TestExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location1), location1, "closed", 3, 3, 3, "not scheduled");
        TestExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location2), location2, "closed", 3, 3, 3, "not scheduled");
    }

    @Test
    public void testStreamingTaskFailure() {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId task1 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId task2 = new TaskId(new StageId("query", 1), 1, 0);
        URI location1 = URI.create("http://localhost:8080/1");
        URI location2 = URI.create("http://localhost:8080/2");
        processor.addPage(location1, TestExchangeClient.createPage(1));
        StreamingExchangeClientBuffer buffer = new StreamingExchangeClientBuffer((Executor)this.scheduler, DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.SECONDS), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(task1, location1);
        exchangeClient.addLocation(task2, location2);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        processor.setComplete(location1);
        org.testng.Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        RuntimeException randomException = new RuntimeException("randomfailure");
        processor.setFailed(location2, randomException);
        Assertions.assertThatThrownBy(() -> TestExchangeClient.getNextPage(exchangeClient)).hasMessageContaining("Encountered too many errors talking to a worker node");
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
    }

    @Test
    public void testDeduplicationTaskFailure() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId attempt0Task1 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId attempt1Task1 = new TaskId(new StageId("query", 1), 1, 0);
        TaskId attempt1Task2 = new TaskId(new StageId("query", 1), 2, 0);
        URI attempt0Task1Location = URI.create("http://localhost:8080/1/0");
        URI attempt1Task1Location = URI.create("http://localhost:8080/1/1");
        URI attempt1Task2Location = URI.create("http://localhost:8080/2/1");
        processor.setFailed(attempt0Task1Location, new RuntimeException("randomfailure"));
        processor.addPage(attempt1Task1Location, TestExchangeClient.createPage(1));
        processor.setComplete(attempt1Task1Location);
        processor.setFailed(attempt1Task2Location, new RuntimeException("randomfailure"));
        DeduplicationExchangeClientBuffer buffer = new DeduplicationExchangeClientBuffer((Executor)this.scheduler, DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE), RetryPolicy.QUERY);
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.SECONDS), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(attempt0Task1, attempt0Task1Location);
        org.testng.Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        exchangeClient.addLocation(attempt1Task1, attempt1Task1Location);
        exchangeClient.addLocation(attempt1Task2, attempt1Task2Location);
        org.testng.Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        exchangeClient.noMoreLocations();
        exchangeClient.isBlocked().get(10L, TimeUnit.SECONDS);
        Assertions.assertThatThrownBy(() -> TestExchangeClient.getNextPage(exchangeClient)).hasMessageContaining("Encountered too many errors talking to a worker node");
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
    }

    @Test
    public void testDeduplication() throws Exception {
        Slice page;
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId taskP0A0 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId taskP1A0 = new TaskId(new StageId("query", 1), 1, 0);
        TaskId taskP0A1 = new TaskId(new StageId("query", 1), 0, 1);
        URI locationP0A0 = URI.create("http://localhost:8080/1");
        URI locationP1A0 = URI.create("http://localhost:8080/2");
        URI locationP0A1 = URI.create("http://localhost:8080/3");
        processor.addPage(locationP1A0, TestExchangeClient.createSerializedPage(1));
        processor.addPage(locationP0A1, TestExchangeClient.createSerializedPage(2));
        processor.addPage(locationP0A1, TestExchangeClient.createSerializedPage(3));
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)new DeduplicationExchangeClientBuffer((Executor)this.scheduler, DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE), RetryPolicy.QUERY), maxResponseSize, 1, new Duration(1.0, TimeUnit.SECONDS), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(taskP0A0, locationP0A0);
        exchangeClient.addLocation(taskP1A0, locationP1A0);
        exchangeClient.addLocation(taskP0A1, locationP0A1);
        processor.setComplete(locationP0A0);
        processor.setFailed(locationP1A0, new RuntimeException("failure"));
        processor.setComplete(locationP0A1);
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        Assertions.assertThatThrownBy(() -> exchangeClient.isBlocked().get(50L, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
        exchangeClient.noMoreLocations();
        exchangeClient.isBlocked().get(10L, TimeUnit.SECONDS);
        ArrayList<Page> pages = new ArrayList<Page>();
        while (!exchangeClient.isFinished() && (page = exchangeClient.pollPage()) != null) {
            pages.add(PAGES_SERDE.deserialize(page));
        }
        Assertions.assertThat(pages).hasSize(2);
        Assertions.assertThat((Iterable)((Iterable)pages.stream().map(Page::getPositionCount).collect(ImmutableSet.toImmutableSet()))).containsAll((Iterable)ImmutableList.of((Object)2, (Object)3));
        Assert.assertEventually(() -> org.testng.Assert.assertTrue((boolean)exchangeClient.isFinished()));
        Assert.assertEventually(() -> {
            org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState");
            org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(1)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState");
            org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(2)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState");
        });
        exchangeClient.close();
    }

    @Test
    public void testTaskFailure() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId task1 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId task2 = new TaskId(new StageId("query", 1), 1, 0);
        TaskId task3 = new TaskId(new StageId("query", 1), 2, 0);
        TaskId task4 = new TaskId(new StageId("query", 1), 3, 0);
        URI location1 = URI.create("http://localhost:8080/1");
        URI location2 = URI.create("http://localhost:8080/2");
        URI location3 = URI.create("http://localhost:8080/3");
        URI location4 = URI.create("http://localhost:8080/4");
        processor.addPage(location1, TestExchangeClient.createSerializedPage(1));
        processor.addPage(location4, TestExchangeClient.createSerializedPage(2));
        processor.addPage(location4, TestExchangeClient.createSerializedPage(3));
        TestingExchangeClientBuffer buffer = new TestingExchangeClientBuffer(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        Set failedTasks = Sets.newConcurrentHashSet();
        CountDownLatch latch = new CountDownLatch(2);
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.SECONDS), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {
            failedTasks.add(taskId);
            latch.countDown();
        });
        Assertions.assertThat(buffer.getAllTasks()).isEmpty();
        Assertions.assertThat((Map)buffer.getPages().asMap()).isEmpty();
        Assertions.assertThat(buffer.getFinishedTasks()).isEmpty();
        Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.testng.Assert.assertFalse((boolean)buffer.isNoMoreTasks());
        exchangeClient.addLocation(task1, location1);
        Assertions.assertThat(buffer.getAllTasks()).containsExactly((Object[])new TaskId[]{task1});
        TestExchangeClient.assertTaskIsNotFinished(buffer, task1);
        processor.setComplete(location1);
        buffer.whenTaskFinished(task1).get(10L, TimeUnit.SECONDS);
        Assertions.assertThat((List)buffer.getPages().get((Object)task1)).hasSize(1);
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{task1});
        exchangeClient.addLocation(task2, location2);
        Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2});
        TestExchangeClient.assertTaskIsNotFinished(buffer, task2);
        RuntimeException randomException = new RuntimeException("randomfailure");
        processor.setFailed(location2, randomException);
        buffer.whenTaskFailed(task2).get(10L, TimeUnit.SECONDS);
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{task1});
        Assertions.assertThat((Iterable)buffer.getFailedTasks().keySet()).containsExactly((Object[])new TaskId[]{task2});
        Assertions.assertThat((List)buffer.getPages().get((Object)task2)).hasSize(0);
        exchangeClient.addLocation(task3, location3);
        Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3});
        TestExchangeClient.assertTaskIsNotFinished(buffer, task2);
        TestExchangeClient.assertTaskIsNotFinished(buffer, task3);
        TrinoException trinoException = new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "generic internal error");
        processor.setFailed(location3, (RuntimeException)((Object)trinoException));
        buffer.whenTaskFailed(task3).get(10L, TimeUnit.SECONDS);
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{task1});
        Assertions.assertThat((Iterable)buffer.getFailedTasks().keySet()).containsExactlyInAnyOrder((Object[])new TaskId[]{task2, task3});
        Assertions.assertThat((List)buffer.getPages().get((Object)task2)).hasSize(0);
        Assertions.assertThat((List)buffer.getPages().get((Object)task3)).hasSize(0);
        org.testng.Assert.assertTrue((boolean)latch.await(10L, TimeUnit.SECONDS));
        org.testng.Assert.assertEquals((Set)failedTasks, (Set)ImmutableSet.of((Object)task2, (Object)task3));
        exchangeClient.addLocation(task4, location4);
        Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3, task4});
        TestExchangeClient.assertTaskIsNotFinished(buffer, task4);
        processor.setComplete(location4);
        buffer.whenTaskFinished(task4).get(10L, TimeUnit.SECONDS);
        Assertions.assertThat((List)buffer.getPages().get((Object)task4)).hasSize(2);
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task4});
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        buffer.setFinished(true);
        org.testng.Assert.assertTrue((boolean)exchangeClient.isFinished());
        exchangeClient.close();
        Assert.assertEventually(() -> org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState"));
        Assert.assertEventually(() -> org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(1)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState"));
        Assert.assertEventually(() -> org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(2)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState"));
        Assert.assertEventually(() -> org.testng.Assert.assertEquals((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(3)).getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState"));
        Assertions.assertThat(buffer.getFinishedTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task4});
        Assertions.assertThat((Iterable)buffer.getFailedTasks().keySet()).containsExactlyInAnyOrder((Object[])new TaskId[]{task2, task3});
        Assertions.assertThat((Iterable)((Iterable)buffer.getFailedTasks().asMap().get(task2))).hasSize(1);
        Assertions.assertThat((Throwable)((Throwable)((Collection)buffer.getFailedTasks().asMap().get(task2)).iterator().next())).isInstanceOf(TrinoTransportException.class);
        Assertions.assertThat((Iterable)((Iterable)buffer.getFailedTasks().asMap().get(task3))).hasSize(1);
        Assertions.assertThat((Throwable)((Throwable)((Collection)buffer.getFailedTasks().asMap().get(task3)).iterator().next())).isEqualTo((Object)trinoException);
        org.testng.Assert.assertTrue((boolean)exchangeClient.isFinished());
    }

    private static void assertTaskIsNotFinished(TestingExchangeClientBuffer buffer, TaskId task) {
        Assertions.assertThatThrownBy(() -> buffer.whenTaskFinished(task).get(50L, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
    }

    @Test
    public void testStreamingBufferLimit() {
        DataSize maxResponseSize = DataSize.ofBytes((long)1L);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        processor.setComplete(location);
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)new StreamingExchangeClientBuffer((Executor)this.scheduler, DataSize.ofBytes((long)1L)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-testBufferLimit-%s")))), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location);
        exchangeClient.noMoreLocations();
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        long start = System.nanoTime();
        do {
            io.airlift.testing.Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        org.testng.Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        org.testng.Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 1, 1, 1, "not scheduled");
        TestExchangeClient.assertPageEquals(exchangeClient.pollPage(), TestExchangeClient.createPage(1));
        do {
            io.airlift.testing.Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 2, 2, 2, "not scheduled");
        org.testng.Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        org.testng.Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertPageEquals(exchangeClient.pollPage(), TestExchangeClient.createPage(2));
        do {
            io.airlift.testing.Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 3, 3, 3, "not scheduled");
        org.testng.Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        org.testng.Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(3));
        org.testng.Assert.assertNull((Object)TestExchangeClient.getNextPage(exchangeClient));
        org.testng.Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        org.testng.Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() == 0L ? 1 : 0) != 0);
        org.testng.Assert.assertEquals((boolean)exchangeClient.isFinished(), (boolean)true);
        exchangeClient.close();
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "closed", 3, 5, 5, "not scheduled");
    }

    @Test
    public void testStreamingAbortOnDataCorruption() {
        URI location = URI.create("http://localhost:8080");
        ExchangeClient exchangeClient = this.setUpDataCorruption(FeaturesConfig.DataIntegrityVerification.ABORT, location);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> TestExchangeClient.getNextPage(exchangeClient)).isInstanceOf(TrinoException.class)).hasMessageMatching("Checksum verification failure on localhost when reading from http://localhost:8080/0: Data corruption, read checksum: 0xdd450d930a94ddde, calculated checksum: 0x9bdc9de3ce57c972");
        exchangeClient.close();
    }

    @Test
    public void testStreamingRetryDataCorruption() {
        URI location = URI.create("http://localhost:8080");
        ExchangeClient exchangeClient = this.setUpDataCorruption(FeaturesConfig.DataIntegrityVerification.RETRY, location);
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(2));
        org.testng.Assert.assertNull((Object)TestExchangeClient.getNextPage(exchangeClient));
        org.testng.Assert.assertTrue((boolean)exchangeClient.isFinished());
        exchangeClient.close();
        ExchangeClientStatus status = exchangeClient.getStatus();
        org.testng.Assert.assertEquals((int)status.getBufferedPages(), (int)0);
        org.testng.Assert.assertEquals((long)status.getBufferedBytes(), (long)0L);
        TestExchangeClient.assertStatus((PageBufferClientStatus)status.getPageBufferClientStatuses().get(0), location, "closed", 2, 4, 4, "not scheduled");
    }

    private ExchangeClient setUpDataCorruption(FeaturesConfig.DataIntegrityVerification dataIntegrityVerification, URI location) {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        final MockExchangeRequestProcessor delegate = new MockExchangeRequestProcessor(maxResponseSize);
        delegate.addPage(location, TestExchangeClient.createPage(1));
        delegate.addPage(location, TestExchangeClient.createPage(2));
        delegate.setComplete(location);
        TestingHttpClient.Processor processor = new TestingHttpClient.Processor(){
            private int completedRequests;
            private TestingResponse savedResponse;

            public synchronized Response handle(Request request) throws Exception {
                if (this.completedRequests == 0) {
                    Verify.verify((this.savedResponse == null ? 1 : 0) != 0);
                    TestingResponse response = (TestingResponse)delegate.handle(request);
                    Preconditions.checkState((response.getStatusCode() == HttpStatus.OK.code() ? 1 : 0) != 0, (String)"Unexpected status code: %s", (int)response.getStatusCode());
                    ListMultimap headers = (ListMultimap)response.getHeaders().entries().stream().collect(ImmutableListMultimap.toImmutableListMultimap(entry -> ((HeaderName)entry.getKey()).toString(), Map.Entry::getValue));
                    byte[] bytes = ByteStreams.toByteArray((InputStream)response.getInputStream());
                    Preconditions.checkState((bytes.length > 42 ? 1 : 0) != 0, (Object)"too short");
                    this.savedResponse = new TestingResponse(HttpStatus.OK, headers, (byte[])bytes.clone());
                    bytes[42] = (byte)(bytes[42] + 1);
                    ++this.completedRequests;
                    return new TestingResponse(HttpStatus.OK, headers, bytes);
                }
                if (this.completedRequests == 1) {
                    Verify.verify((this.savedResponse != null ? 1 : 0) != 0);
                    TestingResponse response = this.savedResponse;
                    this.savedResponse = null;
                    ++this.completedRequests;
                    return response;
                }
                ++this.completedRequests;
                return delegate.handle(request);
            }
        };
        ExchangeClient exchangeClient = new ExchangeClient("localhost", dataIntegrityVerification, (ExchangeClientBuffer)new StreamingExchangeClientBuffer((Executor)this.scheduler, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient(processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location);
        exchangeClient.noMoreLocations();
        return exchangeClient;
    }

    @Test
    public void testStreamingClose() throws Exception {
        DataSize maxResponseSize = DataSize.ofBytes((long)1L);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        ExchangeClient exchangeClient = new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (ExchangeClientBuffer)new StreamingExchangeClientBuffer((Executor)this.scheduler, DataSize.ofBytes((long)1L)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-testClose-%s")))), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location);
        exchangeClient.noMoreLocations();
        org.testng.Assert.assertFalse((boolean)exchangeClient.isFinished());
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        exchangeClient.close();
        while (!exchangeClient.isFinished()) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        org.testng.Assert.assertTrue((boolean)exchangeClient.isFinished());
        org.testng.Assert.assertNull((Object)exchangeClient.pollPage());
        org.testng.Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        PageBufferClientStatus clientStatus = (PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0);
        org.testng.Assert.assertEquals((Object)clientStatus.getUri(), (Object)location);
        org.testng.Assert.assertEquals((String)clientStatus.getState(), (String)"closed", (String)"status");
        org.testng.Assert.assertEquals((String)clientStatus.getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState");
    }

    private static Page createPage(int size) {
        return new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, size)});
    }

    private static Slice createSerializedPage(int size) {
        return PAGES_SERDE.serialize(PAGES_SERDE.newContext(), TestExchangeClient.createPage(size));
    }

    private static Slice getNextPage(ExchangeClient exchangeClient) {
        ListenableFuture futurePage = Futures.transform((ListenableFuture)exchangeClient.isBlocked(), ignored -> exchangeClient.isFinished() ? null : exchangeClient.pollPage(), (Executor)MoreExecutors.directExecutor());
        return MoreFutures.tryGetFutureValue((Future)futurePage, (int)100, (TimeUnit)TimeUnit.SECONDS).orElse(null);
    }

    private static void assertPageEquals(Slice actualPage, Page expectedPage) {
        org.testng.Assert.assertNotNull((Object)actualPage);
        org.testng.Assert.assertEquals((int)PagesSerde.getSerializedPagePositionCount((Slice)actualPage), (int)expectedPage.getPositionCount());
        org.testng.Assert.assertEquals((int)PAGES_SERDE.deserialize(actualPage).getChannelCount(), (int)expectedPage.getChannelCount());
    }

    private static void assertStatus(PageBufferClientStatus clientStatus, URI location, String status, int pagesReceived, int requestsScheduled, int requestsCompleted, String httpRequestState) {
        org.testng.Assert.assertEquals((Object)clientStatus.getUri(), (Object)location);
        org.testng.Assert.assertEquals((String)clientStatus.getState(), (String)status, (String)"status");
        org.testng.Assert.assertEquals((int)clientStatus.getPagesReceived(), (int)pagesReceived, (String)"pagesReceived");
        org.testng.Assert.assertEquals((int)clientStatus.getRequestsScheduled(), (int)requestsScheduled, (String)"requestsScheduled");
        org.testng.Assert.assertEquals((int)clientStatus.getRequestsCompleted(), (int)requestsCompleted, (String)"requestsCompleted");
        org.testng.Assert.assertEquals((String)clientStatus.getHttpRequestState(), (String)httpRequestState, (String)"httpRequestState");
    }
}

