/*
 * Decompiled with CFR 0.152.
 */
package io.prestosql.operator;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.prestosql.RowPagesBuilder;
import io.prestosql.Session;
import io.prestosql.connector.CatalogName;
import io.prestosql.execution.Lifespan;
import io.prestosql.memory.context.MemoryTrackingContext;
import io.prestosql.metadata.Split;
import io.prestosql.operator.DriverContext;
import io.prestosql.operator.DriverYieldSignal;
import io.prestosql.operator.Operator;
import io.prestosql.operator.OperatorFactory;
import io.prestosql.operator.OperatorInfo;
import io.prestosql.operator.OperatorStats;
import io.prestosql.operator.ProcessorContext;
import io.prestosql.operator.SourceOperator;
import io.prestosql.operator.SourceOperatorFactory;
import io.prestosql.operator.TestingOperatorContext;
import io.prestosql.operator.WorkProcessor;
import io.prestosql.operator.WorkProcessorAssertion;
import io.prestosql.operator.WorkProcessorOperator;
import io.prestosql.operator.WorkProcessorOperatorFactory;
import io.prestosql.operator.WorkProcessorPipelineSourceOperator;
import io.prestosql.operator.WorkProcessorSourceOperator;
import io.prestosql.operator.WorkProcessorSourceOperatorFactory;
import io.prestosql.spi.Page;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.UpdatablePageSource;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.Type;
import io.prestosql.sql.planner.plan.PlanNodeId;
import io.prestosql.testing.TestingSplit;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestWorkProcessorPipelineSourceOperator {
    private ScheduledExecutorService scheduledExecutor;

    @BeforeClass
    public void setUp() {
        this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    }

    @AfterClass(alwaysRun=true)
    public void tearDown() {
        this.scheduledExecutor.shutdownNow();
    }

    @Test(timeOut=10000L)
    public void testWorkProcessorPipelineSourceOperator() throws InterruptedException {
        Split split = this.createSplit();
        Page page1 = this.createPage(1);
        Page page2 = this.createPage(2);
        Page page3 = this.createPage(3);
        Page page4 = this.createPage(4);
        Page page5 = this.createPage(5);
        WorkProcessor.Transformation sourceOperatorPages = WorkProcessorAssertion.transformationFrom(ImmutableList.of(WorkProcessorAssertion.Transform.of(Optional.of(split), WorkProcessor.TransformationState.ofResult((Object)page1, (boolean)false)), WorkProcessorAssertion.Transform.of(Optional.of(split), WorkProcessor.TransformationState.ofResult((Object)page2, (boolean)true))));
        SettableFuture firstBlockedFuture = SettableFuture.create();
        WorkProcessor.Transformation firstOperatorPages = WorkProcessorAssertion.transformationFrom(ImmutableList.of(WorkProcessorAssertion.Transform.of(Optional.of(page1), WorkProcessor.TransformationState.blocked((ListenableFuture)firstBlockedFuture)), WorkProcessorAssertion.Transform.of(Optional.of(page1), WorkProcessor.TransformationState.ofResult((Object)page3, (boolean)true)), WorkProcessorAssertion.Transform.of(Optional.of(page2), WorkProcessor.TransformationState.ofResult((Object)page4, (boolean)false)), WorkProcessorAssertion.Transform.of(Optional.of(page2), WorkProcessor.TransformationState.finished())), (left, right) -> left.getPositionCount() == right.getPositionCount());
        SettableFuture secondBlockedFuture = SettableFuture.create();
        WorkProcessor.Transformation secondOperatorPages = WorkProcessorAssertion.transformationFrom(ImmutableList.of(WorkProcessorAssertion.Transform.of(Optional.of(page3), WorkProcessor.TransformationState.ofResult((Object)page5, (boolean)true)), WorkProcessorAssertion.Transform.of(Optional.of(page4), WorkProcessor.TransformationState.needsMoreData()), WorkProcessorAssertion.Transform.of(Optional.empty(), WorkProcessor.TransformationState.blocked((ListenableFuture)secondBlockedFuture))), (left, right) -> left.getPositionCount() == right.getPositionCount());
        TestWorkProcessorSourceOperatorFactory sourceOperatorFactory = new TestWorkProcessorSourceOperatorFactory(1, new PlanNodeId("1"), sourceOperatorPages);
        TestWorkProcessorOperatorFactory firstOperatorFactory = new TestWorkProcessorOperatorFactory(2, firstOperatorPages);
        TestWorkProcessorOperatorFactory secondOperatorFactory = new TestWorkProcessorOperatorFactory(3, secondOperatorPages);
        SourceOperatorFactory pipelineOperatorFactory = (SourceOperatorFactory)Iterables.getOnlyElement((Iterable)WorkProcessorPipelineSourceOperator.convertOperators((int)99, (List)ImmutableList.of((Object)sourceOperatorFactory, (Object)firstOperatorFactory, (Object)secondOperatorFactory)));
        DriverContext driverContext = TestingOperatorContext.create(this.scheduledExecutor).getDriverContext();
        SourceOperator pipelineOperator = pipelineOperatorFactory.createOperator(driverContext);
        sourceOperatorFactory.sourceOperator.memoryTrackingContext.localUserMemoryContext().setBytes(123L);
        Assert.assertEquals((long)driverContext.getMemoryUsage(), (long)123L);
        Assert.assertNull((Object)pipelineOperator.getOutput());
        Assert.assertFalse((boolean)pipelineOperator.isBlocked().isDone());
        pipelineOperator.getOperatorContext().getNestedOperatorStats().forEach(operatorStats -> Assert.assertEquals((long)operatorStats.getBlockedWall().toMillis(), (long)0L));
        pipelineOperator.addSplit(split);
        Assert.assertTrue((boolean)pipelineOperator.isBlocked().isDone());
        Assert.assertNull((Object)pipelineOperator.getOutput());
        Assert.assertFalse((boolean)pipelineOperator.isBlocked().isDone());
        Thread.sleep(100L);
        firstBlockedFuture.set(null);
        Assert.assertTrue((boolean)pipelineOperator.isBlocked().isDone());
        List operatorStats2 = pipelineOperator.getOperatorContext().getNestedOperatorStats();
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(0)).getBlockedWall().toMillis(), (long)0L);
        Assert.assertTrue((((OperatorStats)operatorStats2.get(1)).getBlockedWall().toMillis() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(2)).getBlockedWall().toMillis(), (long)0L);
        Assert.assertEquals((int)this.getTestingOperatorInfo((OperatorStats)((OperatorStats)operatorStats2.get((int)1))).count, (int)2);
        Assert.assertEquals((int)this.getTestingOperatorInfo((OperatorStats)((OperatorStats)operatorStats2.get((int)2))).count, (int)2);
        Assert.assertEquals((int)pipelineOperator.getOutput().getPositionCount(), (int)page5.getPositionCount());
        driverContext.getYieldSignal().forceYieldForTesting();
        Assert.assertNull((Object)pipelineOperator.getOutput());
        driverContext.getYieldSignal().resetYieldForTesting();
        Assert.assertNull((Object)pipelineOperator.getOutput());
        Assert.assertFalse((boolean)pipelineOperator.isBlocked().isDone());
        Assert.assertTrue((boolean)sourceOperatorFactory.sourceOperator.closed);
        Assert.assertTrue((boolean)firstOperatorFactory.operator.closed);
        Assert.assertFalse((boolean)secondOperatorFactory.operator.closed);
        Assert.assertEquals((int)this.getTestingOperatorInfo((OperatorStats)((OperatorStats)operatorStats2.get((int)1))).count, (int)3);
        Assert.assertEquals((int)this.getTestingOperatorInfo((OperatorStats)((OperatorStats)operatorStats2.get((int)2))).count, (int)2);
        operatorStats2 = pipelineOperator.getOperatorContext().getNestedOperatorStats();
        Assert.assertEquals((int)this.getTestingOperatorInfo((OperatorStats)((OperatorStats)operatorStats2.get((int)1))).count, (int)3);
        Assert.assertEquals((int)this.getTestingOperatorInfo((OperatorStats)((OperatorStats)operatorStats2.get((int)2))).count, (int)3);
        pipelineOperator.finish();
        Assert.assertFalse((boolean)pipelineOperator.isFinished());
        Assert.assertTrue((boolean)secondOperatorFactory.operator.closed);
        secondBlockedFuture.set(null);
        Assert.assertTrue((boolean)pipelineOperator.isBlocked().isDone());
        Assert.assertNull((Object)pipelineOperator.getOutput());
        Assert.assertTrue((boolean)pipelineOperator.isFinished());
        operatorStats2 = pipelineOperator.getOperatorContext().getNestedOperatorStats();
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(0)).getOutputPositions(), (long)3L);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(1)).getInputPositions(), (long)3L);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(0)).getOutputDataSize().toBytes(), (long)27L);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(1)).getInputDataSize().toBytes(), (long)27L);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(1)).getOutputPositions(), (long)7L);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(2)).getInputPositions(), (long)7L);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(1)).getOutputDataSize().toBytes(), (long)63L);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(2)).getInputDataSize().toBytes(), (long)63L);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(2)).getOutputPositions(), (long)5L);
        Assert.assertEquals((long)((OperatorStats)operatorStats2.get(2)).getOutputDataSize().toBytes(), (long)45L);
        OperatorStats sourceOperatorStats = (OperatorStats)operatorStats2.get(0);
        Assert.assertEquals((Object)sourceOperatorStats.getPhysicalInputDataSize(), (Object)DataSize.ofBytes((long)1L));
        Assert.assertEquals((long)sourceOperatorStats.getPhysicalInputPositions(), (long)2L);
        Assert.assertEquals((Object)sourceOperatorStats.getInternalNetworkInputDataSize(), (Object)DataSize.ofBytes((long)3L));
        Assert.assertEquals((long)sourceOperatorStats.getInternalNetworkInputPositions(), (long)4L);
        Assert.assertEquals((Object)sourceOperatorStats.getInputDataSize(), (Object)DataSize.ofBytes((long)5L));
        Assert.assertEquals((long)sourceOperatorStats.getInputPositions(), (long)6L);
        Assert.assertEquals((Object)sourceOperatorStats.getAddInputWall(), (Object)new Duration(7.0, TimeUnit.NANOSECONDS));
        OperatorStats pipelineOperatorStats = pipelineOperator.getOperatorContext().getOperatorStats();
        Assert.assertEquals((Object)sourceOperatorStats.getPhysicalInputDataSize(), (Object)pipelineOperatorStats.getPhysicalInputDataSize());
        Assert.assertEquals((long)sourceOperatorStats.getPhysicalInputPositions(), (long)pipelineOperatorStats.getPhysicalInputPositions());
        Assert.assertEquals((Object)sourceOperatorStats.getInternalNetworkInputDataSize(), (Object)pipelineOperatorStats.getInternalNetworkInputDataSize());
        Assert.assertEquals((long)sourceOperatorStats.getInternalNetworkInputPositions(), (long)pipelineOperatorStats.getInternalNetworkInputPositions());
        Assert.assertEquals((Object)sourceOperatorStats.getInputDataSize(), (Object)pipelineOperatorStats.getInputDataSize());
        Assert.assertEquals((long)sourceOperatorStats.getInputPositions(), (long)pipelineOperatorStats.getInputPositions());
        Assert.assertEquals((Object)sourceOperatorStats.getAddInputWall(), (Object)pipelineOperatorStats.getAddInputWall());
    }

    private TestOperatorInfo getTestingOperatorInfo(OperatorStats operatorStats) {
        return (TestOperatorInfo)operatorStats.getInfo();
    }

    private Split createSplit() {
        return new Split(new CatalogName("catalog_name"), (ConnectorSplit)TestingSplit.createLocalSplit(), Lifespan.taskWide());
    }

    private Page createPage(int pageNumber) {
        return (Page)Iterables.getOnlyElement(RowPagesBuilder.rowPagesBuilder(new Type[]{BigintType.BIGINT}).addSequencePage(pageNumber, pageNumber).build());
    }

    private static class TestOperatorInfo
    implements OperatorInfo {
        int count;

        private TestOperatorInfo() {
        }
    }

    private static class TestWorkProcessorOperator
    implements WorkProcessorOperator {
        final WorkProcessor<Page> pages;
        final TestOperatorInfo operatorInfo = new TestOperatorInfo();
        boolean closed;

        TestWorkProcessorOperator(WorkProcessor<Page> pages) {
            this.pages = pages;
        }

        public Optional<OperatorInfo> getOperatorInfo() {
            ++this.operatorInfo.count;
            return Optional.of(this.operatorInfo);
        }

        public WorkProcessor<Page> getOutputPages() {
            return this.pages;
        }

        public void close() {
            this.closed = true;
        }
    }

    private static class TestWorkProcessorOperatorFactory
    implements WorkProcessorOperatorFactory,
    OperatorFactory {
        final int operatorId;
        final WorkProcessor.Transformation<Page, Page> transformation;
        TestWorkProcessorOperator operator;

        TestWorkProcessorOperatorFactory(int operatorId, WorkProcessor.Transformation<Page, Page> transformation) {
            this.operatorId = operatorId;
            this.transformation = transformation;
        }

        public int getOperatorId() {
            return this.operatorId;
        }

        public PlanNodeId getPlanNodeId() {
            return new PlanNodeId("test-operator");
        }

        public String getOperatorType() {
            return TestWorkProcessorOperatorFactory.class.getSimpleName();
        }

        public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor<Page> sourcePages) {
            Assert.assertNull((Object)this.operator, (String)"source operator already created");
            this.operator = new TestWorkProcessorOperator((WorkProcessor<Page>)sourcePages.transform(this.transformation));
            return this.operator;
        }

        public Operator createOperator(DriverContext driverContext) {
            throw new UnsupportedOperationException();
        }

        public void noMoreOperators() {
            throw new UnsupportedOperationException();
        }

        public OperatorFactory duplicate() {
            throw new UnsupportedOperationException();
        }
    }

    private static class TestWorkProcessorSourceOperator
    implements WorkProcessorSourceOperator {
        final WorkProcessor<Page> pages;
        boolean closed;
        MemoryTrackingContext memoryTrackingContext;

        TestWorkProcessorSourceOperator(WorkProcessor<Page> pages, MemoryTrackingContext memoryTrackingContext) {
            this.pages = pages;
            this.memoryTrackingContext = memoryTrackingContext;
        }

        public Supplier<Optional<UpdatablePageSource>> getUpdatablePageSourceSupplier() {
            return Optional::empty;
        }

        public DataSize getPhysicalInputDataSize() {
            return DataSize.ofBytes((long)1L);
        }

        public long getPhysicalInputPositions() {
            return 2L;
        }

        public DataSize getInternalNetworkInputDataSize() {
            return DataSize.ofBytes((long)3L);
        }

        public long getInternalNetworkPositions() {
            return 4L;
        }

        public DataSize getInputDataSize() {
            return DataSize.ofBytes((long)5L);
        }

        public long getInputPositions() {
            return 6L;
        }

        public Duration getReadTime() {
            return new Duration(7.0, TimeUnit.NANOSECONDS);
        }

        public WorkProcessor<Page> getOutputPages() {
            return this.pages;
        }

        public void close() {
            this.closed = true;
        }
    }

    private static class TestWorkProcessorSourceOperatorFactory
    implements WorkProcessorSourceOperatorFactory,
    SourceOperatorFactory {
        final int operatorId;
        final PlanNodeId sourceId;
        final WorkProcessor.Transformation<Split, Page> transformation;
        TestWorkProcessorSourceOperator sourceOperator;

        TestWorkProcessorSourceOperatorFactory(int operatorId, PlanNodeId sourceId, WorkProcessor.Transformation<Split, Page> transformation) {
            this.operatorId = operatorId;
            this.sourceId = sourceId;
            this.transformation = transformation;
        }

        public int getOperatorId() {
            return this.operatorId;
        }

        public PlanNodeId getSourceId() {
            return this.sourceId;
        }

        public PlanNodeId getPlanNodeId() {
            return this.sourceId;
        }

        public String getOperatorType() {
            return TestWorkProcessorSourceOperatorFactory.class.getSimpleName();
        }

        public WorkProcessorSourceOperator create(Session session, MemoryTrackingContext memoryTrackingContext, DriverYieldSignal yieldSignal, WorkProcessor<Split> splits) {
            Assert.assertNull((Object)this.sourceOperator, (String)"source operator already created");
            this.sourceOperator = new TestWorkProcessorSourceOperator((WorkProcessor<Page>)splits.transform(this.transformation).yielding(() -> ((DriverYieldSignal)yieldSignal).isSet()), memoryTrackingContext);
            return this.sourceOperator;
        }

        public SourceOperator createOperator(DriverContext driverContext) {
            throw new UnsupportedOperationException();
        }

        public void noMoreOperators() {
            throw new UnsupportedOperationException();
        }
    }
}

