/*
 * Decompiled with CFR 0.152.
 */
package io.prestosql.operator;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.testing.Assertions;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.prestosql.block.BlockAssertions;
import io.prestosql.execution.buffer.PagesSerde;
import io.prestosql.execution.buffer.SerializedPage;
import io.prestosql.execution.buffer.TestingPagesSerdeFactory;
import io.prestosql.memory.context.AggregatedMemoryContext;
import io.prestosql.memory.context.LocalMemoryContext;
import io.prestosql.memory.context.SimpleLocalMemoryContext;
import io.prestosql.operator.ExchangeClient;
import io.prestosql.operator.ExchangeClientStatus;
import io.prestosql.operator.MockExchangeRequestProcessor;
import io.prestosql.operator.PageBufferClientStatus;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestExchangeClient {
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();

    @BeforeClass
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)"test-%s"));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
    }

    @AfterClass(alwaysRun=true)
    public void tearDown() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
            this.scheduler = null;
        }
        if (this.pageBufferClientCallbackExecutor != null) {
            this.pageBufferClientCallbackExecutor.shutdownNow();
            this.pageBufferClientCallbackExecutor = null;
        }
    }

    @Test
    public void testHappyPath() {
        DataSize maxResponseSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        processor.setComplete(location);
        ExchangeClient exchangeClient = new ExchangeClient(new DataSize(32.0, DataSize.Unit.MEGABYTE), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, (ExecutorService)this.scheduler), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor);
        exchangeClient.addLocation(location);
        exchangeClient.noMoreLocations();
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(2));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(3));
        Assert.assertNull((Object)TestExchangeClient.getNextPage(exchangeClient));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)true);
        ExchangeClientStatus status = exchangeClient.getStatus();
        Assert.assertEquals((int)status.getBufferedPages(), (int)0);
        Assert.assertEquals((long)status.getBufferedBytes(), (long)0L);
        TestExchangeClient.assertStatus((PageBufferClientStatus)status.getPageBufferClientStatuses().get(0), location, "closed", 3, 3, 3, "not scheduled");
    }

    @Test(timeOut=10000L)
    public void testAddLocation() throws Exception {
        DataSize maxResponseSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        ExchangeClient exchangeClient = new ExchangeClient(new DataSize(32.0, DataSize.Unit.MEGABYTE), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"test-%s"))), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor);
        URI location1 = URI.create("http://localhost:8081/foo");
        processor.addPage(location1, TestExchangeClient.createPage(1));
        processor.addPage(location1, TestExchangeClient.createPage(2));
        processor.addPage(location1, TestExchangeClient.createPage(3));
        processor.setComplete(location1);
        exchangeClient.addLocation(location1);
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(2));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(3));
        Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        URI location2 = URI.create("http://localhost:8082/bar");
        processor.addPage(location2, TestExchangeClient.createPage(4));
        processor.addPage(location2, TestExchangeClient.createPage(5));
        processor.addPage(location2, TestExchangeClient.createPage(6));
        processor.setComplete(location2);
        exchangeClient.addLocation(location2);
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(4));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(5));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(6));
        Assert.assertFalse((boolean)MoreFutures.tryGetFutureValue((Future)exchangeClient.isBlocked(), (int)10, (TimeUnit)TimeUnit.MILLISECONDS).isPresent());
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        exchangeClient.noMoreLocations();
        while (!exchangeClient.isClosed()) {
            Thread.sleep(1L);
        }
        ImmutableMap statuses = Maps.uniqueIndex((Iterable)exchangeClient.getStatus().getPageBufferClientStatuses(), PageBufferClientStatus::getUri);
        TestExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location1), location1, "closed", 3, 3, 3, "not scheduled");
        TestExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location2), location2, "closed", 3, 3, 3, "not scheduled");
    }

    @Test
    public void testBufferLimit() {
        DataSize maxResponseSize = new DataSize(1.0, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        processor.setComplete(location);
        ExchangeClient exchangeClient = new ExchangeClient(new DataSize(1.0, DataSize.Unit.BYTE), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"test-%s"))), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor);
        exchangeClient.addLocation(location);
        exchangeClient.noMoreLocations();
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        long start = System.nanoTime();
        exchangeClient.scheduleRequestIfNecessary();
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 1, 1, 1, "not scheduled");
        TestExchangeClient.assertPageEquals(exchangeClient.pollPage(), TestExchangeClient.createPage(1));
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 2, 2, 2, "not scheduled");
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertPageEquals(exchangeClient.pollPage(), TestExchangeClient.createPage(2));
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 3, 3, 3, "not scheduled");
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(3));
        Assert.assertNull((Object)TestExchangeClient.getNextPage(exchangeClient));
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() == 0L ? 1 : 0) != 0);
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)true);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "closed", 3, 5, 5, "not scheduled");
    }

    @Test
    public void testClose() throws Exception {
        DataSize maxResponseSize = new DataSize(1.0, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        ExchangeClient exchangeClient = new ExchangeClient(new DataSize(1.0, DataSize.Unit.BYTE), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), true, (HttpClient)new TestingHttpClient((TestingHttpClient.Processor)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"test-%s"))), this.scheduler, (LocalMemoryContext)new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), (Executor)this.pageBufferClientCallbackExecutor);
        exchangeClient.addLocation(location);
        exchangeClient.noMoreLocations();
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(TestExchangeClient.getNextPage(exchangeClient), TestExchangeClient.createPage(1));
        exchangeClient.close();
        while (!exchangeClient.isFinished()) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)true);
        Assert.assertNull((Object)exchangeClient.pollPage());
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        Assert.assertEquals((long)exchangeClient.getStatus().getBufferedBytes(), (long)0L);
        PageBufferClientStatus clientStatus = (PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0);
        Assert.assertEquals((Object)clientStatus.getUri(), (Object)location);
        Assert.assertEquals((String)clientStatus.getState(), (String)"closed", (String)"status");
        Assert.assertEquals((String)clientStatus.getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState");
    }

    private static Page createPage(int size) {
        return new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, size)});
    }

    private static SerializedPage getNextPage(ExchangeClient exchangeClient) {
        ListenableFuture futurePage = Futures.transform((ListenableFuture)exchangeClient.isBlocked(), ignored -> exchangeClient.pollPage(), (Executor)MoreExecutors.directExecutor());
        return MoreFutures.tryGetFutureValue((Future)futurePage, (int)100, (TimeUnit)TimeUnit.SECONDS).orElse(null);
    }

    private static void assertPageEquals(SerializedPage actualPage, Page expectedPage) {
        Assert.assertNotNull((Object)actualPage);
        Assert.assertEquals((int)actualPage.getPositionCount(), (int)expectedPage.getPositionCount());
        Assert.assertEquals((int)PAGES_SERDE.deserialize(actualPage).getChannelCount(), (int)expectedPage.getChannelCount());
    }

    private static void assertStatus(PageBufferClientStatus clientStatus, URI location, String status, int pagesReceived, int requestsScheduled, int requestsCompleted, String httpRequestState) {
        Assert.assertEquals((Object)clientStatus.getUri(), (Object)location);
        Assert.assertEquals((String)clientStatus.getState(), (String)status, (String)"status");
        Assert.assertEquals((int)clientStatus.getPagesReceived(), (int)pagesReceived, (String)"pagesReceived");
        Assert.assertEquals((int)clientStatus.getRequestsScheduled(), (int)requestsScheduled, (String)"requestsScheduled");
        Assert.assertEquals((int)clientStatus.getRequestsCompleted(), (int)requestsCompleted, (String)"requestsCompleted");
        Assert.assertEquals((String)clientStatus.getHttpRequestState(), (String)httpRequestState, (String)"httpRequestState");
    }
}

