/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.Threads;
import io.airlift.units.Duration;
import io.trino.RowPagesBuilder;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.execution.ScheduledSplit;
import io.trino.execution.SplitAssignment;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.metadata.Split;
import io.trino.metadata.TableHandle;
import io.trino.operator.Driver;
import io.trino.operator.DriverContext;
import io.trino.operator.Operator;
import io.trino.operator.OperatorContext;
import io.trino.operator.TableScanOperator;
import io.trino.operator.ValuesOperator;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import io.trino.split.PageSourceProvider;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.MaterializedResult;
import io.trino.testing.PageConsumerOperator;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingTaskContext;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;

@TestInstance(value=TestInstance.Lifecycle.PER_METHOD)
public class TestDriver {
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;
    private DriverContext driverContext;

    @BeforeEach
    public void setUp() {
        this.executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-%s")));
        this.scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-scheduledExecutor-%s")));
        this.driverContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION).addPipelineContext(0, true, true, false).addDriverContext();
    }

    @AfterEach
    public void tearDown() {
        this.executor.shutdownNow();
        this.scheduledExecutor.shutdownNow();
    }

    @Test
    public void testNormalFinish() {
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        ValuesOperator source = new ValuesOperator(this.driverContext.addOperatorContext(0, new PlanNodeId("test"), "values"), RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).addSequencePage(10, 20, 30, 40).build());
        PageConsumerOperator sink = this.createSinkOperator((List<Type>)types);
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)source, (Operator[])new Operator[]{sink});
        Assertions.assertThat((Object)driver.getDriverContext()).isSameAs((Object)this.driverContext);
        Assertions.assertThat((boolean)driver.isFinished()).isFalse();
        ListenableFuture blocked = driver.processForDuration(new Duration(1.0, TimeUnit.SECONDS));
        Assertions.assertThat((boolean)blocked.isDone()).isTrue();
        Assertions.assertThat((boolean)driver.isFinished()).isTrue();
        Assertions.assertThat((boolean)sink.isFinished()).isTrue();
        Assertions.assertThat((boolean)source.isFinished()).isTrue();
    }

    @RepeatedTest(value=1000)
    @Timeout(value=10L)
    public void testConcurrentClose() {
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        OperatorContext operatorContext = this.driverContext.addOperatorContext(0, new PlanNodeId("test"), "values");
        ValuesOperator source = new ValuesOperator(operatorContext, RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).addSequencePage(10, 20, 30, 40).build());
        PageConsumerOperator sink = this.createSinkOperator((List<Type>)types);
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)source, (Operator[])new Operator[]{sink});
        this.scheduledExecutor.submit(() -> driver.processForDuration(new Duration(1.0, TimeUnit.NANOSECONDS)));
        this.scheduledExecutor.submit(() -> ((Driver)driver).close());
        while (!this.driverContext.isTerminatingOrDone()) {
            Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
        }
    }

    @Test
    public void testAbruptFinish() {
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        ValuesOperator source = new ValuesOperator(this.driverContext.addOperatorContext(0, new PlanNodeId("test"), "values"), RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).addSequencePage(10, 20, 30, 40).build());
        PageConsumerOperator sink = this.createSinkOperator((List<Type>)types);
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)source, (Operator[])new Operator[]{sink});
        Assertions.assertThat((Object)driver.getDriverContext()).isSameAs((Object)this.driverContext);
        Assertions.assertThat((boolean)driver.isFinished()).isFalse();
        driver.close();
        Assertions.assertThat((boolean)driver.isFinished()).isTrue();
        Assertions.assertThat((boolean)source.isFinished()).isFalse();
        Assertions.assertThat((boolean)sink.isFinished()).isFalse();
        Assertions.assertThat((boolean)sink.isClosed()).isTrue();
    }

    @Test
    public void testAddSourceFinish() {
        PlanNodeId sourceId = new PlanNodeId("source");
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        TableScanOperator source = new TableScanOperator(this.driverContext.addOperatorContext(99, new PlanNodeId("test"), "values"), sourceId, (arg_0, arg_1, arg_2, arg_3, arg_4) -> TestDriver.lambda$testAddSourceFinish$1((List)types, arg_0, arg_1, arg_2, arg_3, arg_4), TestingHandles.TEST_TABLE_HANDLE, (Iterable)ImmutableList.of(), DynamicFilter.EMPTY);
        PageConsumerOperator sink = this.createSinkOperator((List<Type>)types);
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)source, (Operator[])new Operator[]{sink});
        Assertions.assertThat((Object)driver.getDriverContext()).isSameAs((Object)this.driverContext);
        Assertions.assertThat((boolean)driver.isFinished()).isFalse();
        Assertions.assertThat((boolean)driver.processForDuration(new Duration(1.0, TimeUnit.MILLISECONDS)).isDone()).isFalse();
        Assertions.assertThat((boolean)driver.isFinished()).isFalse();
        driver.updateSplitAssignment(new SplitAssignment(sourceId, (Set)ImmutableSet.of((Object)new ScheduledSplit(0L, sourceId, TestDriver.newMockSplit())), true));
        Assertions.assertThat((boolean)driver.isFinished()).isFalse();
        Assertions.assertThat((boolean)driver.processForDuration(new Duration(1.0, TimeUnit.SECONDS)).isDone()).isTrue();
        Assertions.assertThat((boolean)driver.isFinished()).isTrue();
        Assertions.assertThat((boolean)sink.isFinished()).isTrue();
        Assertions.assertThat((boolean)source.isFinished()).isTrue();
    }

    @Test
    public void testBrokenOperatorCloseWhileProcessing() {
        BrokenOperator brokenOperator = new BrokenOperator(this.driverContext.addOperatorContext(0, new PlanNodeId("test"), "source"), false);
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)brokenOperator, (Operator[])new Operator[]{this.createSinkOperator((List<Type>)ImmutableList.of())});
        Assertions.assertThat((Object)driver.getDriverContext()).isSameAs((Object)this.driverContext);
        Future<Boolean> driverProcessFor = this.executor.submit(() -> driver.processForDuration(new Duration(1.0, TimeUnit.MILLISECONDS)).isDone());
        brokenOperator.waitForLocked();
        driver.close();
        Assertions.assertThat((boolean)driver.isFinished()).isTrue();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> driverProcessFor.get(1L, TimeUnit.SECONDS)).isInstanceOf(ExecutionException.class)).hasCause((Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Driver was interrupted"));
        Assertions.assertThat((boolean)driver.getDestroyedFuture().isDone()).isTrue();
    }

    @Test
    public void testBrokenOperatorProcessWhileClosing() throws Exception {
        BrokenOperator brokenOperator = new BrokenOperator(this.driverContext.addOperatorContext(0, new PlanNodeId("test"), "source"), true);
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)brokenOperator, (Operator[])new Operator[]{this.createSinkOperator((List<Type>)ImmutableList.of())});
        Assertions.assertThat((Object)driver.getDriverContext()).isSameAs((Object)this.driverContext);
        Future<Boolean> driverClose = this.executor.submit(() -> {
            driver.close();
            return true;
        });
        brokenOperator.waitForLocked();
        Assertions.assertThat((boolean)driver.processForDuration(new Duration(1.0, TimeUnit.MILLISECONDS)).isDone()).isTrue();
        Assertions.assertThat((boolean)driver.isFinished()).isTrue();
        Assertions.assertThat((boolean)driver.getDestroyedFuture().isDone()).isFalse();
        brokenOperator.unlock();
        Assertions.assertThat((Boolean)driverClose.get()).isTrue();
        Assertions.assertThat((boolean)driver.getDestroyedFuture().isDone()).isTrue();
    }

    @Test
    public void testMemoryRevocationRace() {
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        AlwaysBlockedMemoryRevokingTableScanOperator source = new AlwaysBlockedMemoryRevokingTableScanOperator(this.driverContext.addOperatorContext(99, new PlanNodeId("test"), "scan"), new PlanNodeId("source"), (arg_0, arg_1, arg_2, arg_3, arg_4) -> TestDriver.lambda$testMemoryRevocationRace$5((List)types, arg_0, arg_1, arg_2, arg_3, arg_4), TestingHandles.TEST_TABLE_HANDLE, (Iterable<ColumnHandle>)ImmutableList.of());
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)source, (Operator[])new Operator[]{this.createSinkOperator((List<Type>)types)});
        Assertions.assertThat((boolean)driver.processForDuration(new Duration(100.0, TimeUnit.MILLISECONDS)).isDone()).isTrue();
    }

    @Test
    public void testUnblocksOnFinish() {
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        AlwaysBlockedTableScanOperator source = new AlwaysBlockedTableScanOperator(this.driverContext.addOperatorContext(99, new PlanNodeId("test"), "scan"), new PlanNodeId("source"), (arg_0, arg_1, arg_2, arg_3, arg_4) -> TestDriver.lambda$testUnblocksOnFinish$6((List)types, arg_0, arg_1, arg_2, arg_3, arg_4), TestingHandles.TEST_TABLE_HANDLE, (Iterable<ColumnHandle>)ImmutableList.of());
        MaterializedResult.Builder resultBuilder = MaterializedResult.resultBuilder((Session)this.driverContext.getSession(), (Iterable)types);
        BlockedSinkOperator sink = new BlockedSinkOperator(this.driverContext.addOperatorContext(1, new PlanNodeId("test"), "sink"), arg_0 -> ((MaterializedResult.Builder)resultBuilder).page(arg_0), Function.identity());
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)source, (Operator[])new Operator[]{sink});
        ListenableFuture blocked = driver.processForDuration(new Duration(100.0, TimeUnit.MILLISECONDS));
        Assertions.assertThat((boolean)blocked.isDone()).isFalse();
        sink.setFinished();
        Assertions.assertThat((boolean)blocked.isDone()).isTrue();
    }

    @Test
    public void testUnblocksOnTimeout() throws InterruptedException {
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        this.driverContext.setBlockedTimeout(new Duration(70.0, TimeUnit.MILLISECONDS));
        PageConsumerOperator operator1 = this.createSinkOperator((List<Type>)types, 1, "test1");
        BlockedOperator operator2 = this.createBlockedOperator((List<Type>)types, 2, "test2");
        PageConsumerOperator operator3 = this.createSinkOperator((List<Type>)types, 3, "test3");
        PageConsumerOperator operator4 = this.createSinkOperator((List<Type>)types, 4, "test3");
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)operator1, (Operator[])new Operator[]{operator2, operator3, operator4});
        ListenableFuture blocked = driver.processForDuration(new Duration(200.0, TimeUnit.MILLISECONDS));
        Assertions.assertThat((boolean)blocked.isDone()).isFalse();
        Thread.sleep(100L);
        Assertions.assertThat((boolean)blocked.isDone()).isTrue();
        Assertions.assertThat((boolean)operator2.isCancelled()).isFalse();
        Assertions.assertThat((boolean)operator2.isDone()).isFalse();
    }

    @Test
    public void testUnblocksWhenBlockedOperatorIsUnblockedAndTimeoutIsSet() {
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        this.driverContext.setBlockedTimeout(new Duration(100.0, TimeUnit.MILLISECONDS));
        PageConsumerOperator operator1 = this.createSinkOperator((List<Type>)types, 1, "test1");
        BlockedOperator operator2 = this.createBlockedOperator((List<Type>)types, 2, "test2");
        PageConsumerOperator operator3 = this.createSinkOperator((List<Type>)types, 3, "test3");
        PageConsumerOperator operator4 = this.createSinkOperator((List<Type>)types, 4, "test3");
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)operator1, (Operator[])new Operator[]{operator2, operator3, operator4});
        ListenableFuture blocked = driver.processForDuration(new Duration(200.0, TimeUnit.MILLISECONDS));
        Assertions.assertThat((boolean)blocked.isDone()).isFalse();
        operator2.setDone();
        Assertions.assertThat((boolean)operator2.isDone()).isTrue();
        Assertions.assertThat((boolean)blocked.isDone()).isTrue();
        Assertions.assertThat((boolean)operator2.isCancelled()).isFalse();
    }

    @Test
    public void testBrokenOperatorAddSource() {
        PlanNodeId sourceId = new PlanNodeId("source");
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        NotBlockedTableScanOperator source = new NotBlockedTableScanOperator(this.driverContext.addOperatorContext(99, new PlanNodeId("test"), "values"), sourceId, (arg_0, arg_1, arg_2, arg_3, arg_4) -> TestDriver.lambda$testBrokenOperatorAddSource$7((List)types, arg_0, arg_1, arg_2, arg_3, arg_4), TestingHandles.TEST_TABLE_HANDLE, (Iterable<ColumnHandle>)ImmutableList.of());
        BrokenOperator brokenOperator = new BrokenOperator(this.driverContext.addOperatorContext(0, new PlanNodeId("test"), "source"));
        Driver driver = Driver.createDriver((DriverContext)this.driverContext, (Operator)source, (Operator[])new Operator[]{brokenOperator});
        Future<Boolean> driverProcessFor = this.executor.submit(() -> driver.processForDuration(new Duration(1.0, TimeUnit.MILLISECONDS)).isDone());
        brokenOperator.waitForLocked();
        Assertions.assertThat((Object)driver.getDriverContext()).isSameAs((Object)this.driverContext);
        Assertions.assertThat((boolean)driver.isFinished()).isFalse();
        Assertions.assertThat((boolean)driver.processForDuration(new Duration(1.0, TimeUnit.MILLISECONDS)).isDone()).isTrue();
        Assertions.assertThat((boolean)driver.isFinished()).isFalse();
        driver.updateSplitAssignment(new SplitAssignment(sourceId, (Set)ImmutableSet.of((Object)new ScheduledSplit(0L, sourceId, TestDriver.newMockSplit())), true));
        Assertions.assertThat((boolean)driver.getDestroyedFuture().isDone()).isFalse();
        Assertions.assertThat((boolean)driver.processForDuration(new Duration(1.0, TimeUnit.SECONDS)).isDone()).isTrue();
        Assertions.assertThat((boolean)driver.isFinished()).isFalse();
        driver.close();
        Assertions.assertThat((boolean)driver.isFinished()).isTrue();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> driverProcessFor.get(1L, TimeUnit.SECONDS)).isInstanceOf(ExecutionException.class)).hasCause((Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Driver was interrupted"));
        Assertions.assertThat((boolean)driver.getDestroyedFuture().isDone()).isTrue();
    }

    private static Split newMockSplit() {
        return new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new MockSplit());
    }

    private PageConsumerOperator createSinkOperator(List<Type> types) {
        return this.createSinkOperator(types, 1, "test");
    }

    private PageConsumerOperator createSinkOperator(List<Type> types, int operatorId, String planNodeId) {
        MaterializedResult.Builder resultBuilder = MaterializedResult.resultBuilder((Session)this.driverContext.getSession(), types);
        return new PageConsumerOperator(this.driverContext.addOperatorContext(operatorId, new PlanNodeId(planNodeId), "sink"), arg_0 -> ((MaterializedResult.Builder)resultBuilder).page(arg_0), Function.identity());
    }

    private BlockedOperator createBlockedOperator(List<Type> types, int operatorId, String planNodeId) {
        MaterializedResult.Builder resultBuilder = MaterializedResult.resultBuilder((Session)this.driverContext.getSession(), types);
        return new BlockedOperator(this.driverContext.addOperatorContext(operatorId, new PlanNodeId(planNodeId), "sink"), arg_0 -> ((MaterializedResult.Builder)resultBuilder).page(arg_0), Function.identity());
    }

    private static /* synthetic */ ConnectorPageSource lambda$testBrokenOperatorAddSource$7(List types, Session session, Split split, TableHandle table, List columns, DynamicFilter dynamicFilter) {
        return new FixedPageSource(RowPagesBuilder.rowPagesBuilder(types).addSequencePage(10, 20, 30, 40).build());
    }

    private static /* synthetic */ ConnectorPageSource lambda$testUnblocksOnFinish$6(List types, Session session, Split split, TableHandle table, List columns, DynamicFilter dynamicFilter) {
        return new FixedPageSource(RowPagesBuilder.rowPagesBuilder(types).addSequencePage(10, 20, 30, 40).build());
    }

    private static /* synthetic */ ConnectorPageSource lambda$testMemoryRevocationRace$5(List types, Session session, Split split, TableHandle table, List columns, DynamicFilter dynamicFilter) {
        return new FixedPageSource(RowPagesBuilder.rowPagesBuilder(types).addSequencePage(10, 20, 30, 40).build());
    }

    private static /* synthetic */ ConnectorPageSource lambda$testAddSourceFinish$1(List types, Session session, Split split, TableHandle table, List columns, DynamicFilter dynamicFilter) {
        return new FixedPageSource(RowPagesBuilder.rowPagesBuilder(types).addSequencePage(10, 20, 30, 40).build());
    }

    private static class BrokenOperator
    implements Operator {
        private final OperatorContext operatorContext;
        private final ReentrantLock lock = new ReentrantLock();
        private final CountDownLatch lockedLatch = new CountDownLatch(1);
        private final CountDownLatch unlockLatch = new CountDownLatch(1);
        private final boolean lockForClose;

        private BrokenOperator(OperatorContext operatorContext) {
            this(operatorContext, false);
        }

        private BrokenOperator(OperatorContext operatorContext, boolean lockForClose) {
            this.operatorContext = operatorContext;
            this.lockForClose = lockForClose;
        }

        public OperatorContext getOperatorContext() {
            return this.operatorContext;
        }

        public void unlock() {
            this.unlockLatch.countDown();
        }

        private void waitForLocked() {
            try {
                Assertions.assertThat((boolean)this.lockedLatch.await(10L, TimeUnit.SECONDS)).isTrue();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted", e);
            }
        }

        private void waitForUnlock() {
            try {
                Assertions.assertThat((boolean)this.lock.tryLock(1L, TimeUnit.SECONDS)).isTrue();
                try {
                    this.lockedLatch.countDown();
                    Assertions.assertThat((boolean)this.unlockLatch.await(5L, TimeUnit.SECONDS)).isTrue();
                }
                finally {
                    this.lock.unlock();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted", e);
            }
        }

        public void finish() {
            this.waitForUnlock();
        }

        public boolean isFinished() {
            this.waitForUnlock();
            return true;
        }

        public ListenableFuture<Void> isBlocked() {
            this.waitForUnlock();
            return NOT_BLOCKED;
        }

        public boolean needsInput() {
            this.waitForUnlock();
            return false;
        }

        public void addInput(Page page) {
            this.waitForUnlock();
        }

        public Page getOutput() {
            this.waitForUnlock();
            return null;
        }

        public void close() {
            if (this.lockForClose) {
                this.waitForUnlock();
            }
        }
    }

    private static class AlwaysBlockedMemoryRevokingTableScanOperator
    extends TableScanOperator {
        public AlwaysBlockedMemoryRevokingTableScanOperator(OperatorContext operatorContext, PlanNodeId planNodeId, PageSourceProvider pageSourceProvider, TableHandle table, Iterable<ColumnHandle> columns) {
            super(operatorContext, planNodeId, pageSourceProvider, table, columns, DynamicFilter.EMPTY);
        }

        public ListenableFuture<Void> isBlocked() {
            LocalMemoryContext revocableMemoryContext = this.getOperatorContext().localRevocableMemoryContext();
            revocableMemoryContext.setBytes(100L);
            this.getOperatorContext().requestMemoryRevoking();
            return SettableFuture.create();
        }
    }

    private static class AlwaysBlockedTableScanOperator
    extends TableScanOperator {
        public AlwaysBlockedTableScanOperator(OperatorContext operatorContext, PlanNodeId planNodeId, PageSourceProvider pageSourceProvider, TableHandle table, Iterable<ColumnHandle> columns) {
            super(operatorContext, planNodeId, pageSourceProvider, table, columns, DynamicFilter.EMPTY);
        }

        public ListenableFuture<Void> isBlocked() {
            return SettableFuture.create();
        }
    }

    private static class BlockedSinkOperator
    extends PageConsumerOperator {
        private final SettableFuture<Void> finished = SettableFuture.create();

        public BlockedSinkOperator(OperatorContext operatorContext, Consumer<Page> pageConsumer, Function<Page, Page> pagePreprocessor) {
            super(operatorContext, pageConsumer, pagePreprocessor);
            operatorContext.setFinishedFuture(this.finished);
        }

        public boolean isFinished() {
            return this.finished.isDone();
        }

        void setFinished() {
            this.finished.set(null);
        }
    }

    private static class BlockedOperator
    extends PageConsumerOperator {
        private final SettableFuture<Void> blocked = SettableFuture.create();

        public BlockedOperator(OperatorContext operatorContext, Consumer<Page> pageConsumer, Function<Page, Page> pagePreprocessor) {
            super(operatorContext, pageConsumer, pagePreprocessor);
        }

        public ListenableFuture<Void> isBlocked() {
            return this.blocked;
        }

        private void setDone() {
            this.blocked.set(null);
        }

        private boolean isDone() {
            return this.blocked.isDone();
        }

        private boolean isCancelled() {
            return this.blocked.isCancelled();
        }
    }

    private static class NotBlockedTableScanOperator
    extends TableScanOperator {
        public NotBlockedTableScanOperator(OperatorContext operatorContext, PlanNodeId planNodeId, PageSourceProvider pageSourceProvider, TableHandle table, Iterable<ColumnHandle> columns) {
            super(operatorContext, planNodeId, pageSourceProvider, table, columns, DynamicFilter.EMPTY);
        }

        public ListenableFuture<Void> isBlocked() {
            return NOT_BLOCKED;
        }
    }

    private static class MockSplit
    implements ConnectorSplit {
        private MockSplit() {
        }

        public Object getInfo() {
            return null;
        }

        public long getRetainedSizeInBytes() {
            return 0L;
        }
    }
}

