/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HeaderName;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.slice.Slice;
import io.airlift.testing.Assertions;
import io.airlift.tracing.Tracing;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.trino.FeaturesConfig;
import io.trino.block.BlockAssertions;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.TestSqlTaskExecution;
import io.trino.execution.buffer.PageDeserializer;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.execution.buffer.PagesSerdeUtil;
import io.trino.execution.buffer.TestingPagesSerdeFactory;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.memory.context.SimpleLocalMemoryContext;
import io.trino.operator.DeduplicatingDirectExchangeBuffer;
import io.trino.operator.DirectExchangeBuffer;
import io.trino.operator.DirectExchangeClient;
import io.trino.operator.DirectExchangeClientStatus;
import io.trino.operator.HttpPageBufferClient;
import io.trino.operator.MockExchangeRequestProcessor;
import io.trino.operator.PageBufferClientStatus;
import io.trino.operator.RetryPolicy;
import io.trino.operator.StreamingDirectExchangeBuffer;
import io.trino.operator.TestingDirectExchangeBuffer;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.Page;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.TrinoTransportException;
import io.trino.spi.block.Block;
import io.trino.spi.exchange.ExchangeId;
import io.trino.testing.assertions.Assert;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Execution(value=ExecutionMode.CONCURRENT)
public class TestDirectExchangeClient {
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private PagesSerdeFactory serdeFactory;

    @BeforeAll
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-%s")));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
        this.serdeFactory = new TestingPagesSerdeFactory();
    }

    @AfterAll
    public void tearDown() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
            this.scheduler = null;
        }
        if (this.pageBufferClientCallbackExecutor != null) {
            this.pageBufferClientCallbackExecutor.shutdownNow();
            this.pageBufferClientCallbackExecutor = null;
        }
        this.serdeFactory = null;
    }

    @Test
    public void testHappyPath() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        ImmutableList pages = ImmutableList.of((Object)this.createSerializedPage(1), (Object)this.createSerializedPage(2), (Object)this.createSerializedPage(3));
        URI location = URI.create("http://localhost:8080");
        pages.forEach(page -> processor.addPage(location, (Slice)page));
        processor.setComplete(location);
        TestingDirectExchangeBuffer buffer = new TestingDirectExchangeBuffer(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((Map)buffer.getPages().asMap()).isEmpty();
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((boolean)buffer.isNoMoreTasks()).isFalse();
        TaskId taskId2 = new TaskId(new StageId("query", 1), 0, 0);
        exchangeClient.addLocation(taskId2, location);
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).containsExactly((Object[])new TaskId[]{taskId2});
        exchangeClient.noMoreLocations();
        org.assertj.core.api.Assertions.assertThat((boolean)buffer.isNoMoreTasks()).isTrue();
        buffer.whenTaskFinished(taskId2).get(10L, TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{taskId2});
        org.assertj.core.api.Assertions.assertThat((List)buffer.getPages().get((Object)taskId2)).hasSize(3);
        org.assertj.core.api.Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        buffer.setFinished(true);
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isTrue();
        DirectExchangeClientStatus status = exchangeClient.getStatus();
        org.assertj.core.api.Assertions.assertThat((int)status.getBufferedPages()).isEqualTo(0);
        TestDirectExchangeClient.assertStatus((PageBufferClientStatus)status.getPageBufferClientStatuses().get(0), location, "closed", 3, 3, 3, "not scheduled");
        org.assertj.core.api.Assertions.assertThat((double)status.getRequestDuration().getDigest().getCount()).isEqualTo(2.0);
        exchangeClient.close();
        Assert.assertEventually(() -> ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled"));
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{taskId2});
        org.assertj.core.api.Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((int)buffer.getPages().size()).isEqualTo(3);
    }

    @Test
    public void testStreamingHappyPath() {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestDirectExchangeClient.createPage(1));
        processor.addPage(location, TestDirectExchangeClient.createPage(2));
        processor.addPage(location, TestDirectExchangeClient.createPage(3));
        processor.setComplete(location);
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)new StreamingDirectExchangeBuffer((Executor)this.scheduler, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location);
        exchangeClient.noMoreLocations();
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(1));
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(2));
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(3));
        org.assertj.core.api.Assertions.assertThat((Comparable)TestDirectExchangeClient.getNextPage(exchangeClient)).isNull();
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isTrue();
        DirectExchangeClientStatus status = exchangeClient.getStatus();
        org.assertj.core.api.Assertions.assertThat((int)status.getBufferedPages()).isEqualTo(0);
        TestDirectExchangeClient.assertStatus((PageBufferClientStatus)status.getPageBufferClientStatuses().get(0), location, "closed", 3, 3, 3, "not scheduled");
        exchangeClient.close();
    }

    @Test
    public void testAddLocation() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId task1 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId task2 = new TaskId(new StageId("query", 1), 1, 0);
        TaskId task3 = new TaskId(new StageId("query", 1), 2, 0);
        URI location1 = URI.create("http://localhost:8080/1");
        URI location2 = URI.create("http://localhost:8080/2");
        URI location3 = URI.create("http://localhost:8080/3");
        processor.addPage(location1, this.createSerializedPage(1));
        processor.addPage(location1, this.createSerializedPage(2));
        TestingDirectExchangeBuffer buffer = new TestingDirectExchangeBuffer(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((Map)buffer.getPages().asMap()).isEmpty();
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((boolean)buffer.isNoMoreTasks()).isFalse();
        exchangeClient.addLocation(task1, location1);
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).containsExactly((Object[])new TaskId[]{task1});
        TestDirectExchangeClient.assertTaskIsNotFinished(buffer, task1);
        processor.setComplete(location1);
        buffer.whenTaskFinished(task1).get(10L, TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat((List)buffer.getPages().get((Object)task1)).hasSize(2);
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{task1});
        exchangeClient.addLocation(task2, location2);
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2});
        TestDirectExchangeClient.assertTaskIsNotFinished(buffer, task2);
        processor.setComplete(location2);
        buffer.whenTaskFinished(task2).get(10L, TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2});
        org.assertj.core.api.Assertions.assertThat((List)buffer.getPages().get((Object)task2)).hasSize(0);
        exchangeClient.addLocation(task3, location3);
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3});
        TestDirectExchangeClient.assertTaskIsNotFinished(buffer, task3);
        exchangeClient.noMoreLocations();
        org.assertj.core.api.Assertions.assertThat((boolean)buffer.isNoMoreTasks()).isTrue();
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3});
        TestDirectExchangeClient.assertTaskIsNotFinished(buffer, task3);
        exchangeClient.close();
        buffer.whenTaskFinished(task3).get(10L, TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3});
        org.assertj.core.api.Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        Assert.assertEventually(() -> ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled"));
        Assert.assertEventually(() -> ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(1)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled"));
        Assert.assertEventually(() -> ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(2)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled"));
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isTrue();
    }

    @Test
    @Timeout(value=10L)
    public void testStreamingAddLocation() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)new StreamingDirectExchangeBuffer((Executor)this.scheduler, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-testAddLocation-%s")))), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        URI location1 = URI.create("http://localhost:8081/foo");
        processor.addPage(location1, TestDirectExchangeClient.createPage(1));
        processor.addPage(location1, TestDirectExchangeClient.createPage(2));
        processor.addPage(location1, TestDirectExchangeClient.createPage(3));
        processor.setComplete(location1);
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location1);
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(1));
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(2));
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(3));
        org.assertj.core.api.Assertions.assertThat((Comparable)exchangeClient.pollPage()).isNull();
        ListenableFuture firstBlocked = exchangeClient.isBlocked();
        org.assertj.core.api.Assertions.assertThat((boolean)MoreFutures.tryGetFutureValue((Future)firstBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent()).isFalse();
        org.assertj.core.api.Assertions.assertThat((boolean)firstBlocked.isDone()).isFalse();
        org.assertj.core.api.Assertions.assertThat((Comparable)exchangeClient.pollPage()).isNull();
        ListenableFuture secondBlocked = exchangeClient.isBlocked();
        org.assertj.core.api.Assertions.assertThat((boolean)MoreFutures.tryGetFutureValue((Future)secondBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent()).isFalse();
        org.assertj.core.api.Assertions.assertThat((boolean)secondBlocked.isDone()).isFalse();
        org.assertj.core.api.Assertions.assertThat((Comparable)exchangeClient.pollPage()).isNull();
        ListenableFuture thirdBlocked = exchangeClient.isBlocked();
        org.assertj.core.api.Assertions.assertThat((boolean)MoreFutures.tryGetFutureValue((Future)thirdBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent()).isFalse();
        org.assertj.core.api.Assertions.assertThat((boolean)thirdBlocked.isDone()).isFalse();
        thirdBlocked.cancel(true);
        org.assertj.core.api.Assertions.assertThat((boolean)thirdBlocked.isDone()).isTrue();
        org.assertj.core.api.Assertions.assertThat((boolean)MoreFutures.tryGetFutureValue((Future)firstBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent()).isFalse();
        org.assertj.core.api.Assertions.assertThat((boolean)firstBlocked.isDone()).isFalse();
        org.assertj.core.api.Assertions.assertThat((boolean)MoreFutures.tryGetFutureValue((Future)secondBlocked, (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent()).isFalse();
        org.assertj.core.api.Assertions.assertThat((boolean)secondBlocked.isDone()).isFalse();
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        URI location2 = URI.create("http://localhost:8082/bar");
        processor.addPage(location2, TestDirectExchangeClient.createPage(4));
        processor.addPage(location2, TestDirectExchangeClient.createPage(5));
        processor.addPage(location2, TestDirectExchangeClient.createPage(6));
        processor.setComplete(location2);
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 1, 0), location2);
        MoreFutures.tryGetFutureValue((Future)firstBlocked, (int)5, (TimeUnit)TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat((boolean)firstBlocked.isDone()).isTrue();
        MoreFutures.tryGetFutureValue((Future)secondBlocked, (int)5, (TimeUnit)TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat((boolean)secondBlocked.isDone()).isTrue();
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(4));
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(5));
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(6));
        org.assertj.core.api.Assertions.assertThat((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent()).isFalse();
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        exchangeClient.noMoreLocations();
        while (!exchangeClient.isFinished()) {
            Thread.sleep(1L);
        }
        exchangeClient.close();
        ImmutableMap statuses = Maps.uniqueIndex((Iterable)exchangeClient.getStatus().getPageBufferClientStatuses(), PageBufferClientStatus::getUri);
        TestDirectExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location1), location1, "closed", 3, 3, 3, "not scheduled");
        TestDirectExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location2), location2, "closed", 3, 3, 3, "not scheduled");
    }

    @Test
    public void testStreamingTaskFailure() {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId task1 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId task2 = new TaskId(new StageId("query", 1), 1, 0);
        URI location1 = URI.create("http://localhost:8080/1");
        URI location2 = URI.create("http://localhost:8080/2");
        processor.addPage(location1, TestDirectExchangeClient.createPage(1));
        StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer((Executor)this.scheduler, DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.SECONDS), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(task1, location1);
        exchangeClient.addLocation(task2, location2);
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(1));
        processor.setComplete(location1);
        org.assertj.core.api.Assertions.assertThat((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent()).isFalse();
        RuntimeException randomException = new RuntimeException("randomfailure");
        processor.setFailed(location2, randomException);
        org.assertj.core.api.Assertions.assertThatThrownBy(() -> TestDirectExchangeClient.getNextPage(exchangeClient)).hasMessageContaining("Encountered too many errors talking to a worker node");
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
    }

    @Test
    public void testDeduplicationTaskFailure() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId attempt0Task1 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId attempt1Task1 = new TaskId(new StageId("query", 1), 1, 0);
        TaskId attempt1Task2 = new TaskId(new StageId("query", 1), 2, 0);
        URI attempt0Task1Location = URI.create("http://localhost:8080/1/0");
        URI attempt1Task1Location = URI.create("http://localhost:8080/1/1");
        URI attempt1Task2Location = URI.create("http://localhost:8080/2/1");
        processor.setFailed(attempt0Task1Location, new RuntimeException("randomfailure"));
        processor.addPage(attempt1Task1Location, TestDirectExchangeClient.createPage(1));
        processor.setComplete(attempt1Task1Location);
        processor.setFailed(attempt1Task2Location, new RuntimeException("randomfailure"));
        DeduplicatingDirectExchangeBuffer buffer = new DeduplicatingDirectExchangeBuffer((Executor)this.scheduler, DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE), RetryPolicy.QUERY, new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer()), new QueryId("query"), Span.getInvalid(), ExchangeId.createRandomExchangeId());
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.SECONDS), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(attempt0Task1, attempt0Task1Location);
        org.assertj.core.api.Assertions.assertThat((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent()).isFalse();
        exchangeClient.addLocation(attempt1Task1, attempt1Task1Location);
        exchangeClient.addLocation(attempt1Task2, attempt1Task2Location);
        org.assertj.core.api.Assertions.assertThat((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent()).isFalse();
        exchangeClient.noMoreLocations();
        exchangeClient.isBlocked().get(10L, TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThatThrownBy(() -> TestDirectExchangeClient.getNextPage(exchangeClient)).hasMessageContaining("Encountered too many errors talking to a worker node");
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
    }

    @Test
    public void testDeduplication() throws Exception {
        Slice page;
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId taskP0A0 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId taskP1A0 = new TaskId(new StageId("query", 1), 1, 0);
        TaskId taskP0A1 = new TaskId(new StageId("query", 1), 0, 1);
        URI locationP0A0 = URI.create("http://localhost:8080/1");
        URI locationP1A0 = URI.create("http://localhost:8080/2");
        URI locationP0A1 = URI.create("http://localhost:8080/3");
        processor.addPage(locationP1A0, this.createSerializedPage(1));
        processor.addPage(locationP0A1, this.createSerializedPage(2));
        processor.addPage(locationP0A1, this.createSerializedPage(3));
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)new DeduplicatingDirectExchangeBuffer((Executor)this.scheduler, DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE), RetryPolicy.QUERY, new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer()), new QueryId("query"), Span.getInvalid(), ExchangeId.createRandomExchangeId()), maxResponseSize, 1, new Duration(1.0, TimeUnit.SECONDS), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(taskP0A0, locationP0A0);
        exchangeClient.addLocation(taskP1A0, locationP1A0);
        exchangeClient.addLocation(taskP0A1, locationP0A1);
        processor.setComplete(locationP0A0);
        processor.setFailed(locationP1A0, new RuntimeException("failure"));
        processor.setComplete(locationP0A1);
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        org.assertj.core.api.Assertions.assertThatThrownBy(() -> exchangeClient.isBlocked().get(50L, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
        exchangeClient.noMoreLocations();
        exchangeClient.isBlocked().get(10L, TimeUnit.SECONDS);
        ArrayList<Page> pages = new ArrayList<Page>();
        PageDeserializer deserializer = this.serdeFactory.createDeserializer(Optional.empty());
        while (!exchangeClient.isFinished() && (page = exchangeClient.pollPage()) != null) {
            pages.add(deserializer.deserialize(page));
        }
        org.assertj.core.api.Assertions.assertThat(pages).hasSize(2);
        org.assertj.core.api.Assertions.assertThat((Collection)((Collection)pages.stream().map(Page::getPositionCount).collect(ImmutableSet.toImmutableSet()))).containsAll((Iterable)ImmutableList.of((Object)2, (Object)3));
        Assert.assertEventually(() -> org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isTrue());
        Assert.assertEventually(() -> {
            ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled");
            ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(1)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled");
            ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(2)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled");
        });
        exchangeClient.close();
    }

    @Test
    public void testTaskFailure() throws Exception {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        TaskId task1 = new TaskId(new StageId("query", 1), 0, 0);
        TaskId task2 = new TaskId(new StageId("query", 1), 1, 0);
        TaskId task3 = new TaskId(new StageId("query", 1), 2, 0);
        TaskId task4 = new TaskId(new StageId("query", 1), 3, 0);
        URI location1 = URI.create("http://localhost:8080/1");
        URI location2 = URI.create("http://localhost:8080/2");
        URI location3 = URI.create("http://localhost:8080/3");
        URI location4 = URI.create("http://localhost:8080/4");
        processor.addPage(location1, this.createSerializedPage(1));
        processor.addPage(location4, this.createSerializedPage(2));
        processor.addPage(location4, this.createSerializedPage(3));
        TestingDirectExchangeBuffer buffer = new TestingDirectExchangeBuffer(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        Set failedTasks = Sets.newConcurrentHashSet();
        CountDownLatch latch = new CountDownLatch(2);
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)buffer, maxResponseSize, 1, new Duration(1.0, TimeUnit.SECONDS), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {
            failedTasks.add(taskId);
            latch.countDown();
        });
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((Map)buffer.getPages().asMap()).isEmpty();
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((Map)buffer.getFailedTasks().asMap()).isEmpty();
        org.assertj.core.api.Assertions.assertThat((boolean)buffer.isNoMoreTasks()).isFalse();
        exchangeClient.addLocation(task1, location1);
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).containsExactly((Object[])new TaskId[]{task1});
        TestDirectExchangeClient.assertTaskIsNotFinished(buffer, task1);
        processor.setComplete(location1);
        buffer.whenTaskFinished(task1).get(10L, TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat((List)buffer.getPages().get((Object)task1)).hasSize(1);
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{task1});
        exchangeClient.addLocation(task2, location2);
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2});
        TestDirectExchangeClient.assertTaskIsNotFinished(buffer, task2);
        RuntimeException randomException = new RuntimeException("randomfailure");
        processor.setFailed(location2, randomException);
        buffer.whenTaskFailed(task2).get(10L, TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{task1});
        org.assertj.core.api.Assertions.assertThat((Collection)buffer.getFailedTasks().keySet()).containsExactly((Object[])new TaskId[]{task2});
        org.assertj.core.api.Assertions.assertThat((List)buffer.getPages().get((Object)task2)).hasSize(0);
        exchangeClient.addLocation(task3, location3);
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3});
        TestDirectExchangeClient.assertTaskIsNotFinished(buffer, task2);
        TestDirectExchangeClient.assertTaskIsNotFinished(buffer, task3);
        TrinoException trinoException = new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "generic internal error");
        processor.setFailed(location3, (RuntimeException)((Object)trinoException));
        buffer.whenTaskFailed(task3).get(10L, TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactly((Object[])new TaskId[]{task1});
        org.assertj.core.api.Assertions.assertThat((Collection)buffer.getFailedTasks().keySet()).containsExactlyInAnyOrder((Object[])new TaskId[]{task2, task3});
        org.assertj.core.api.Assertions.assertThat((List)buffer.getPages().get((Object)task2)).hasSize(0);
        org.assertj.core.api.Assertions.assertThat((List)buffer.getPages().get((Object)task3)).hasSize(0);
        org.assertj.core.api.Assertions.assertThat((boolean)latch.await(10L, TimeUnit.SECONDS)).isTrue();
        org.assertj.core.api.Assertions.assertThat((Collection)failedTasks).isEqualTo((Object)ImmutableSet.of((Object)task2, (Object)task3));
        exchangeClient.addLocation(task4, location4);
        org.assertj.core.api.Assertions.assertThat(buffer.getAllTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task2, task3, task4});
        TestDirectExchangeClient.assertTaskIsNotFinished(buffer, task4);
        processor.setComplete(location4);
        buffer.whenTaskFinished(task4).get(10L, TimeUnit.SECONDS);
        org.assertj.core.api.Assertions.assertThat((List)buffer.getPages().get((Object)task4)).hasSize(2);
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task4});
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        buffer.setFinished(true);
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isTrue();
        exchangeClient.close();
        Assert.assertEventually(() -> ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled"));
        Assert.assertEventually(() -> ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(1)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled"));
        Assert.assertEventually(() -> ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(2)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled"));
        Assert.assertEventually(() -> ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(3)).getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled"));
        org.assertj.core.api.Assertions.assertThat(buffer.getFinishedTasks()).containsExactlyInAnyOrder((Object[])new TaskId[]{task1, task4});
        org.assertj.core.api.Assertions.assertThat((Collection)buffer.getFailedTasks().keySet()).containsExactlyInAnyOrder((Object[])new TaskId[]{task2, task3});
        org.assertj.core.api.Assertions.assertThat((Collection)((Collection)buffer.getFailedTasks().asMap().get(task2))).hasSize(1);
        org.assertj.core.api.Assertions.assertThat((Throwable)((Throwable)((Collection)buffer.getFailedTasks().asMap().get(task2)).iterator().next())).isInstanceOf(TrinoTransportException.class);
        org.assertj.core.api.Assertions.assertThat((Collection)((Collection)buffer.getFailedTasks().asMap().get(task3))).hasSize(1);
        org.assertj.core.api.Assertions.assertThat((Throwable)((Throwable)((Collection)buffer.getFailedTasks().asMap().get(task3)).iterator().next())).isEqualTo((Object)trinoException);
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isTrue();
    }

    private static void assertTaskIsNotFinished(TestingDirectExchangeBuffer buffer, TaskId task) {
        org.assertj.core.api.Assertions.assertThatThrownBy(() -> buffer.whenTaskFinished(task).get(50L, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
    }

    @Test
    public void testStreamingBufferLimit() {
        DataSize maxResponseSize = DataSize.ofBytes((long)1L);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestDirectExchangeClient.createPage(1));
        processor.addPage(location, TestDirectExchangeClient.createPage(2));
        processor.addPage(location, TestDirectExchangeClient.createPage(3));
        processor.setComplete(location);
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)new StreamingDirectExchangeBuffer((Executor)this.scheduler, DataSize.ofBytes((long)1L)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-testBufferLimit-%s")))), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location);
        exchangeClient.noMoreLocations();
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        long start = System.nanoTime();
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        org.assertj.core.api.Assertions.assertThat((int)exchangeClient.getStatus().getBufferedPages()).isEqualTo(1);
        org.assertj.core.api.Assertions.assertThat((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0).isTrue();
        TestDirectExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 1, 1, 1, "not scheduled");
        this.assertPageEquals(exchangeClient.pollPage(), TestDirectExchangeClient.createPage(1));
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestDirectExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 2, 2, 2, "not scheduled");
        org.assertj.core.api.Assertions.assertThat((int)exchangeClient.getStatus().getBufferedPages()).isEqualTo(1);
        org.assertj.core.api.Assertions.assertThat((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0).isTrue();
        this.assertPageEquals(exchangeClient.pollPage(), TestDirectExchangeClient.createPage(2));
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestDirectExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 3, 3, 3, "not scheduled");
        org.assertj.core.api.Assertions.assertThat((int)exchangeClient.getStatus().getBufferedPages()).isEqualTo(1);
        org.assertj.core.api.Assertions.assertThat((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0).isTrue();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(3));
        org.assertj.core.api.Assertions.assertThat((Comparable)TestDirectExchangeClient.getNextPage(exchangeClient)).isNull();
        org.assertj.core.api.Assertions.assertThat((int)exchangeClient.getStatus().getBufferedPages()).isEqualTo(0);
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isTrue();
        exchangeClient.close();
        TestDirectExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "closed", 3, 5, 5, "not scheduled");
    }

    @Test
    public void testStreamingAbortOnDataCorruption() {
        URI location = URI.create("http://localhost:8080");
        DirectExchangeClient exchangeClient = this.setUpDataCorruption(FeaturesConfig.DataIntegrityVerification.ABORT, location);
        ((AbstractThrowableAssert)org.assertj.core.api.Assertions.assertThatThrownBy(() -> TestDirectExchangeClient.getNextPage(exchangeClient)).isInstanceOf(TrinoException.class)).hasMessageMatching("Checksum verification failure on localhost when reading from http://localhost:8080/0: Data corruption, read checksum: 0x3f7c49fcdc6f98ea, calculated checksum: 0xcb4f99c2d19a4b04");
        exchangeClient.close();
    }

    @Test
    public void testStreamingRetryDataCorruption() {
        URI location = URI.create("http://localhost:8080");
        DirectExchangeClient exchangeClient = this.setUpDataCorruption(FeaturesConfig.DataIntegrityVerification.RETRY, location);
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(1));
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(2));
        org.assertj.core.api.Assertions.assertThat((Comparable)TestDirectExchangeClient.getNextPage(exchangeClient)).isNull();
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isTrue();
        exchangeClient.close();
        DirectExchangeClientStatus status = exchangeClient.getStatus();
        org.assertj.core.api.Assertions.assertThat((int)status.getBufferedPages()).isEqualTo(0);
        org.assertj.core.api.Assertions.assertThat((long)status.getBufferedBytes()).isEqualTo(0L);
        TestDirectExchangeClient.assertStatus((PageBufferClientStatus)status.getPageBufferClientStatuses().get(0), location, "closed", 2, 4, 4, "not scheduled");
    }

    private DirectExchangeClient setUpDataCorruption(FeaturesConfig.DataIntegrityVerification dataIntegrityVerification, URI location) {
        DataSize maxResponseSize = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        final MockExchangeRequestProcessor delegate = new MockExchangeRequestProcessor(maxResponseSize);
        delegate.addPage(location, TestDirectExchangeClient.createPage(1));
        delegate.addPage(location, TestDirectExchangeClient.createPage(2));
        delegate.setComplete(location);
        TestingHttpClient.Processor processor = new TestingHttpClient.Processor(){
            private int completedRequests;
            private TestingResponse savedResponse;

            public synchronized Response handle(Request request) throws Exception {
                if (this.completedRequests == 0) {
                    Verify.verify((this.savedResponse == null ? 1 : 0) != 0);
                    TestingResponse response = (TestingResponse)delegate.handle(request);
                    Preconditions.checkState((response.getStatusCode() == HttpStatus.OK.code() ? 1 : 0) != 0, (String)"Unexpected status code: %s", (int)response.getStatusCode());
                    ListMultimap headers = (ListMultimap)response.getHeaders().entries().stream().collect(ImmutableListMultimap.toImmutableListMultimap(entry -> ((HeaderName)entry.getKey()).toString(), Map.Entry::getValue));
                    byte[] bytes = ByteStreams.toByteArray((InputStream)response.getInputStream());
                    Preconditions.checkState((bytes.length > 42 ? 1 : 0) != 0, (Object)"too short");
                    this.savedResponse = new TestingResponse(HttpStatus.OK, headers, (byte[])bytes.clone());
                    bytes[42] = (byte)(bytes[42] + 1);
                    ++this.completedRequests;
                    return new TestingResponse(HttpStatus.OK, headers, bytes);
                }
                if (this.completedRequests == 1) {
                    Verify.verify((this.savedResponse != null ? 1 : 0) != 0);
                    TestingResponse response = this.savedResponse;
                    this.savedResponse = null;
                    ++this.completedRequests;
                    return response;
                }
                ++this.completedRequests;
                return delegate.handle(request);
            }
        };
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", dataIntegrityVerification, (DirectExchangeBuffer)new StreamingDirectExchangeBuffer((Executor)this.scheduler, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient(processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location);
        exchangeClient.noMoreLocations();
        return exchangeClient;
    }

    @Test
    public void testStreamingClose() throws Exception {
        DataSize maxResponseSize = DataSize.ofBytes((long)1L);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestDirectExchangeClient.createPage(1));
        processor.addPage(location, TestDirectExchangeClient.createPage(2));
        processor.addPage(location, TestDirectExchangeClient.createPage(3));
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)new StreamingDirectExchangeBuffer((Executor)this.scheduler, DataSize.ofBytes((long)1L)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-testClose-%s")))), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.addLocation(new TaskId(new StageId("query", 1), 0, 0), location);
        exchangeClient.noMoreLocations();
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isFalse();
        this.assertPageEquals(TestDirectExchangeClient.getNextPage(exchangeClient), TestDirectExchangeClient.createPage(1));
        exchangeClient.close();
        while (!exchangeClient.isFinished()) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        org.assertj.core.api.Assertions.assertThat((boolean)exchangeClient.isFinished()).isTrue();
        org.assertj.core.api.Assertions.assertThat((Comparable)exchangeClient.pollPage()).isNull();
        org.assertj.core.api.Assertions.assertThat((int)exchangeClient.getStatus().getBufferedPages()).isEqualTo(0);
        PageBufferClientStatus clientStatus = (PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0);
        org.assertj.core.api.Assertions.assertThat((URI)clientStatus.getUri()).isEqualTo((Object)location);
        ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)clientStatus.getState()).describedAs("status", new Object[0])).isEqualTo("closed");
        ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)clientStatus.getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo("not scheduled");
    }

    @Test
    public void testScheduleWhenOneClientFilledBuffer() {
        DataSize maxResponseSize = DataSize.of((long)8L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        URI locationOne = URI.create("http://localhost:8080");
        URI locationTwo = URI.create("http://localhost:8081");
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        HttpPageBufferClient clientToBeUsed = this.createHttpPageBufferClient(processor, maxResponseSize, locationOne, new MockClientCallback());
        HttpPageBufferClient clientToBeSkipped = this.createHttpPageBufferClient(processor, maxResponseSize, locationTwo, new MockClientCallback());
        clientToBeUsed.requestSucceeded(DataSize.of((long)33L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes());
        clientToBeSkipped.requestSucceeded(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes());
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)new StreamingDirectExchangeBuffer((Executor)this.scheduler, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.getAllClients().putAll(Map.of(locationOne, clientToBeUsed, locationTwo, clientToBeSkipped));
        exchangeClient.getQueuedClients().addAll(ImmutableList.of((Object)clientToBeUsed, (Object)clientToBeSkipped));
        int clientCount = exchangeClient.scheduleRequestIfNecessary();
        org.assertj.core.api.Assertions.assertThat((int)clientCount).isEqualTo(1);
    }

    @Test
    public void testScheduleWhenAllClientsAreEmpty() {
        DataSize maxResponseSize = DataSize.of((long)8L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        URI locationOne = URI.create("http://localhost:8080");
        URI locationTwo = URI.create("http://localhost:8081");
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        HttpPageBufferClient firstClient = this.createHttpPageBufferClient(processor, maxResponseSize, locationOne, new MockClientCallback());
        HttpPageBufferClient secondClient = this.createHttpPageBufferClient(processor, maxResponseSize, locationTwo, new MockClientCallback());
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)new StreamingDirectExchangeBuffer((Executor)this.scheduler, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.getAllClients().putAll(Map.of(locationOne, firstClient, locationTwo, secondClient));
        exchangeClient.getQueuedClients().addAll(ImmutableList.of((Object)firstClient, (Object)secondClient));
        int clientCount = exchangeClient.scheduleRequestIfNecessary();
        org.assertj.core.api.Assertions.assertThat((int)clientCount).isEqualTo(2);
    }

    @Test
    public void testScheduleWhenThereIsPendingClient() {
        DataSize maxResponseSize = DataSize.of((long)8L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        URI locationOne = URI.create("http://localhost:8080");
        URI locationTwo = URI.create("http://localhost:8081");
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        HttpPageBufferClient pendingClient = this.createHttpPageBufferClient(processor, maxResponseSize, locationOne, new MockClientCallback());
        HttpPageBufferClient clientToBeSkipped = this.createHttpPageBufferClient(processor, maxResponseSize, locationTwo, new MockClientCallback());
        pendingClient.requestSucceeded(DataSize.of((long)33L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes());
        DirectExchangeClient exchangeClient = new DirectExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, (DirectExchangeBuffer)new StreamingDirectExchangeBuffer((Executor)this.scheduler, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE)), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor, (taskId, failure) -> {});
        exchangeClient.getAllClients().putAll(Map.of(locationOne, pendingClient, locationTwo, clientToBeSkipped));
        exchangeClient.getQueuedClients().add(clientToBeSkipped);
        int clientCount = exchangeClient.scheduleRequestIfNecessary();
        org.assertj.core.api.Assertions.assertThat((int)clientCount).isEqualTo(0);
    }

    private HttpPageBufferClient createHttpPageBufferClient(TestingHttpClient.Processor processor, DataSize expectedMaxSize, URI location, HttpPageBufferClient.ClientCallback callback) {
        return new HttpPageBufferClient("localhost", (HttpClient)new TestingHttpClient(processor, (ExecutorService)this.scheduler), FeaturesConfig.DataIntegrityVerification.ABORT, expectedMaxSize, new Duration(1.0, TimeUnit.MINUTES), true, TestSqlTaskExecution.TASK_ID, location, callback, this.scheduler, (Executor)this.pageBufferClientCallbackExecutor);
    }

    private static Page createPage(int size) {
        return new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, size)});
    }

    private Slice createSerializedPage(int size) {
        return this.serdeFactory.createSerializer(Optional.empty()).serialize(TestDirectExchangeClient.createPage(size));
    }

    private static Slice getNextPage(DirectExchangeClient exchangeClient) {
        ListenableFuture futurePage = Futures.transform((ListenableFuture)exchangeClient.isBlocked(), void_ -> exchangeClient.isFinished() ? null : exchangeClient.pollPage(), (Executor)MoreExecutors.directExecutor());
        return MoreFutures.tryGetFutureValue((Future)futurePage, (int)100, (TimeUnit)TimeUnit.SECONDS).orElse(null);
    }

    private void assertPageEquals(Slice actualPage, Page expectedPage) {
        org.assertj.core.api.Assertions.assertThat((Comparable)actualPage).isNotNull();
        org.assertj.core.api.Assertions.assertThat((int)PagesSerdeUtil.getSerializedPagePositionCount((Slice)actualPage)).isEqualTo(expectedPage.getPositionCount());
        org.assertj.core.api.Assertions.assertThat((int)this.serdeFactory.createDeserializer(Optional.empty()).deserialize(actualPage).getChannelCount()).isEqualTo(expectedPage.getChannelCount());
    }

    private static void assertStatus(PageBufferClientStatus clientStatus, URI location, String status, int pagesReceived, int requestsScheduled, int requestsCompleted, String httpRequestState) {
        org.assertj.core.api.Assertions.assertThat((URI)clientStatus.getUri()).isEqualTo((Object)location);
        ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)clientStatus.getState()).describedAs("status", new Object[0])).isEqualTo(status);
        ((AbstractIntegerAssert)org.assertj.core.api.Assertions.assertThat((int)clientStatus.getPagesReceived()).describedAs("pagesReceived", new Object[0])).isEqualTo(pagesReceived);
        ((AbstractIntegerAssert)org.assertj.core.api.Assertions.assertThat((int)clientStatus.getRequestsScheduled()).describedAs("requestsScheduled", new Object[0])).isEqualTo(requestsScheduled);
        ((AbstractIntegerAssert)org.assertj.core.api.Assertions.assertThat((int)clientStatus.getRequestsCompleted()).describedAs("requestsCompleted", new Object[0])).isEqualTo(requestsCompleted);
        ((AbstractStringAssert)org.assertj.core.api.Assertions.assertThat((String)clientStatus.getHttpRequestState()).describedAs("httpRequestState", new Object[0])).isEqualTo(httpRequestState);
    }

    private static class MockClientCallback
    implements HttpPageBufferClient.ClientCallback {
        private MockClientCallback() {
        }

        public boolean addPages(HttpPageBufferClient client, List<Slice> pages) {
            return false;
        }

        public void requestComplete(HttpPageBufferClient client) {
        }

        public void clientFinished(HttpPageBufferClient client) {
        }

        public void clientFailed(HttpPageBufferClient client, Throwable cause) {
        }
    }
}

