/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.operator;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.testing.TestingHttpClient;
import com.facebook.presto.Session;
import com.facebook.presto.SessionTestUtils;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.execution.Location;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
import com.facebook.presto.execution.buffer.TestingPagesSerdeFactory;
import com.facebook.presto.metadata.RemoteTransactionHandle;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.DriverContext;
import com.facebook.presto.operator.ExchangeClient;
import com.facebook.presto.operator.ExchangeClientSupplier;
import com.facebook.presto.operator.ExchangeOperator;
import com.facebook.presto.operator.Operator;
import com.facebook.presto.operator.PageAssertions;
import com.facebook.presto.operator.SourceOperator;
import com.facebook.presto.operator.TaskExchangeClientManager;
import com.facebook.presto.operator.TestingDriftClient;
import com.facebook.presto.operator.TestingExchangeHttpClientHandler;
import com.facebook.presto.operator.TestingTaskBuffer;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.split.RemoteSplit;
import com.facebook.presto.testing.TestingTaskContext;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestExchangeOperator {
    private static final List<Type> TYPES = ImmutableList.of((Object)VarcharType.VARCHAR);
    private static final PagesSerdeFactory SERDE_FACTORY = new TestingPagesSerdeFactory();
    private static final String TASK_1_ID = "task1.0.0.0.0";
    private static final String TASK_2_ID = "task2.0.0.0.0";
    private static final String TASK_3_ID = "task3.0.0.0.0";
    private final LoadingCache<String, TestingTaskBuffer> taskBuffers = CacheBuilder.newBuilder().build(CacheLoader.from(TestingTaskBuffer::new));
    private ScheduledExecutorService scheduler;
    private ScheduledExecutorService scheduledExecutor;
    private HttpClient httpClient;
    private ExchangeClientSupplier exchangeClientSupplier;
    private ExecutorService pageBufferClientCallbackExecutor;

    @BeforeClass
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)"test-%s"));
        this.scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)"test-scheduledExecutor-%s"));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
        this.httpClient = new TestingHttpClient((TestingHttpClient.Processor)new TestingExchangeHttpClientHandler(this.taskBuffers), (ExecutorService)this.scheduler);
        this.exchangeClientSupplier = systemMemoryUsageListener -> new ExchangeClient(new DataSize(32.0, DataSize.Unit.MEGABYTE), new DataSize(10.0, DataSize.Unit.MEGABYTE), 3, new Duration(1.0, TimeUnit.MINUTES), true, 0.2, this.httpClient, new TestingDriftClient(), this.scheduler, systemMemoryUsageListener, (Executor)this.pageBufferClientCallbackExecutor);
    }

    @AfterClass(alwaysRun=true)
    public void tearDown() {
        this.httpClient.close();
        this.httpClient = null;
        this.scheduler.shutdownNow();
        this.scheduler = null;
        this.scheduledExecutor.shutdownNow();
        this.scheduledExecutor = null;
        this.pageBufferClientCallbackExecutor.shutdownNow();
        this.pageBufferClientCallbackExecutor = null;
    }

    @BeforeMethod
    public void setUpMethod() {
        this.taskBuffers.invalidateAll();
    }

    @Test
    public void testSimple() throws Exception {
        SourceOperator operator = this.createExchangeOperator();
        operator.addSplit(new ScheduledSplit(0L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_1_ID)));
        operator.addSplit(new ScheduledSplit(1L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_2_ID)));
        operator.addSplit(new ScheduledSplit(2L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_3_ID)));
        operator.noMoreSplits();
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(10, true);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(10, true);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPages(10, true);
        TestExchangeOperator.waitForPages((Operator)operator, 30);
        TestExchangeOperator.waitForFinished((Operator)operator);
    }

    private static Split newRemoteSplit(String taskId) {
        return new Split(ExchangeOperator.REMOTE_CONNECTOR_ID, (ConnectorTransactionHandle)new RemoteTransactionHandle(), (ConnectorSplit)new RemoteSplit(new Location("http://localhost/" + taskId), TaskId.valueOf((String)taskId)));
    }

    @Test
    public void testWaitForClose() throws Exception {
        SourceOperator operator = this.createExchangeOperator();
        operator.addSplit(new ScheduledSplit(0L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_1_ID)));
        operator.addSplit(new ScheduledSplit(1L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_2_ID)));
        operator.addSplit(new ScheduledSplit(2L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_3_ID)));
        operator.noMoreSplits();
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(1, false);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(1, false);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPages(1, false);
        TestExchangeOperator.waitForPages((Operator)operator, 3);
        Assert.assertEquals((boolean)operator.isFinished(), (boolean)false);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertEquals((Object)operator.getOutput(), null);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(2, true);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(2, true);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPages(2, true);
        TestExchangeOperator.waitForPages((Operator)operator, 6);
        TestExchangeOperator.waitForFinished((Operator)operator);
    }

    @Test
    public void testWaitForNoMoreSplits() throws Exception {
        SourceOperator operator = this.createExchangeOperator();
        operator.addSplit(new ScheduledSplit(0L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_1_ID)));
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(1, true);
        TestExchangeOperator.waitForPages((Operator)operator, 1);
        Assert.assertEquals((boolean)operator.isFinished(), (boolean)false);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertEquals((Object)operator.getOutput(), null);
        operator.addSplit(new ScheduledSplit(1L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_2_ID)));
        operator.noMoreSplits();
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(2, true);
        TestExchangeOperator.waitForPages((Operator)operator, 2);
        TestExchangeOperator.waitForFinished((Operator)operator);
    }

    @Test
    public void testFinish() throws Exception {
        SourceOperator operator = this.createExchangeOperator();
        operator.addSplit(new ScheduledSplit(0L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_1_ID)));
        operator.addSplit(new ScheduledSplit(1L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_2_ID)));
        operator.addSplit(new ScheduledSplit(2L, operator.getSourceId(), TestExchangeOperator.newRemoteSplit(TASK_3_ID)));
        operator.noMoreSplits();
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(1, false);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(1, false);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPages(1, false);
        TestExchangeOperator.waitForPages((Operator)operator, 3);
        Assert.assertEquals((boolean)operator.isFinished(), (boolean)false);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertEquals((Object)operator.getOutput(), null);
        operator.finish();
        TestExchangeOperator.waitForFinished((Operator)operator);
    }

    private SourceOperator createExchangeOperator() {
        ExchangeOperator.ExchangeOperatorFactory operatorFactory = new ExchangeOperator.ExchangeOperatorFactory(0, new PlanNodeId("test"), new TaskExchangeClientManager(this.exchangeClientSupplier), SERDE_FACTORY);
        DriverContext driverContext = TestingTaskContext.createTaskContext((Executor)this.scheduler, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION).addPipelineContext(0, true, true, false).addDriverContext();
        SourceOperator operator = operatorFactory.createOperator(driverContext);
        Assert.assertEquals((long)operator.getOperatorContext().getOperatorStats().getSystemMemoryReservation().toBytes(), (long)0L);
        return operator;
    }

    private static List<Page> waitForPages(Operator operator, int expectedPageCount) throws InterruptedException {
        long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        ArrayList<Page> outputPages = new ArrayList<Page>();
        boolean greaterThanZero = false;
        while (System.nanoTime() - endTime < 0L && !operator.isFinished()) {
            if (operator.getOperatorContext().getDriverContext().getPipelineContext().getPipelineStats().getSystemMemoryReservationInBytes() > 0L) {
                greaterThanZero = true;
                break;
            }
            Thread.sleep(10L);
        }
        Assert.assertTrue((boolean)greaterThanZero);
        while (outputPages.size() < expectedPageCount && System.nanoTime() < endTime) {
            Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
            if (operator.isFinished()) break;
            Page outputPage = operator.getOutput();
            if (outputPage != null) {
                outputPages.add(outputPage);
                continue;
            }
            Thread.sleep(10L);
        }
        Thread.sleep(10L);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertNull((Object)operator.getOutput());
        Assert.assertEquals((int)outputPages.size(), (int)expectedPageCount);
        for (Page page : outputPages) {
            PageAssertions.assertPageEquals(TYPES, page, TestingTaskBuffer.PAGE);
        }
        Assert.assertEquals((long)operator.getOperatorContext().getOperatorStats().getSystemMemoryReservation().toBytes(), (long)0L);
        return outputPages;
    }

    private static void waitForFinished(Operator operator) throws InterruptedException {
        long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        while (System.nanoTime() - endTime < 0L) {
            Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
            Assert.assertNull((Object)operator.getOutput());
            if (operator.isFinished()) break;
            Thread.sleep(10L);
        }
        Assert.assertEquals((boolean)operator.isFinished(), (boolean)true);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertNull((Object)operator.getOutput());
        Assert.assertEquals((long)operator.getOperatorContext().getOperatorStats().getSystemMemoryReservation().toBytes(), (long)0L);
    }
}

