/*
 * Decompiled with CFR 0.152.
 */
package io.trino.memory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.Threads;
import io.airlift.stats.GcMonitor;
import io.airlift.stats.TestingGcMonitor;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.execution.buffer.TestingPagesSerdeFactory;
import io.trino.memory.MemoryPool;
import io.trino.memory.MemoryPoolListener;
import io.trino.memory.QueryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.Driver;
import io.trino.operator.DriverContext;
import io.trino.operator.Operator;
import io.trino.operator.OperatorContext;
import io.trino.operator.TableScanOperator;
import io.trino.operator.TaskContext;
import io.trino.spi.Page;
import io.trino.spi.QueryId;
import io.trino.spiller.SpillSpaceTracker;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.PageConsumerOperator;
import io.trino.testing.TestingTaskContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
class TestMemoryPools {
    private static final DataSize TEN_MEGABYTES = DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
    private static final DataSize TEN_MEGABYTES_WITHOUT_TWO_BYTES = DataSize.ofBytes((long)(TEN_MEGABYTES.toBytes() - 2L));
    private static final DataSize ONE_BYTE = DataSize.ofBytes((long)1L);
    private final TaskId fakeTaskId = new TaskId(new StageId(new QueryId("fake"), 0), 0, 0);
    private final ExecutorService executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"local-query-runner-executor-%s"));
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)"local-query-runner-scheduler-%s"));

    TestMemoryPools() {
    }

    @AfterAll
    void tearDown() {
        this.executor.shutdownNow();
        this.scheduler.shutdownNow();
    }

    private RevocableMemoryDriver createRevocableMemoryDriver(MemoryPool userPool, DataSize reservedPerPage, long numberOfPages) {
        QueryContext queryContext = new QueryContext(new QueryId("query"), TEN_MEGABYTES, userPool, (GcMonitor)new TestingGcMonitor(), (Executor)this.executor, this.scheduler, this.scheduler, TEN_MEGABYTES, new SpillSpaceTracker(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.GIGABYTE)));
        TaskContext taskContext = TestingTaskContext.createTaskContext((QueryContext)queryContext, (Executor)this.executor, (Session)SessionTestUtils.TEST_SESSION);
        DriverContext driverContext = taskContext.addPipelineContext(0, false, false, false).addDriverContext();
        OperatorContext revokableOperatorContext = driverContext.addOperatorContext(Integer.MAX_VALUE, new PlanNodeId("revokable_operator"), TableScanOperator.class.getSimpleName());
        PageConsumerOperator.PageConsumerOutputFactory outputFactory = new PageConsumerOperator.PageConsumerOutputFactory(types -> page -> {});
        Operator outputOperator = outputFactory.createOutputOperator(2, new PlanNodeId("output"), (List)ImmutableList.of(), Function.identity(), (PagesSerdeFactory)new TestingPagesSerdeFactory()).createOperator(driverContext);
        RevocableMemoryOperator revocableMemoryOperator = new RevocableMemoryOperator(revokableOperatorContext, reservedPerPage, numberOfPages);
        Driver driver = Driver.createDriver((DriverContext)driverContext, (Operator)revocableMemoryOperator, (Operator[])new Operator[]{outputOperator});
        return new RevocableMemoryDriver(driver, revocableMemoryOperator);
    }

    @Test
    void testNotifyListenerOnMemoryReserved() {
        MemoryPool userPool = new MemoryPool(TEN_MEGABYTES);
        AtomicReference notifiedPool = new AtomicReference();
        AtomicLong notifiedBytes = new AtomicLong();
        userPool.addListener(MemoryPoolListener.onMemoryReserved(pool -> {
            notifiedPool.set(pool);
            notifiedBytes.set(pool.getReservedBytes());
        }));
        userPool.reserve(this.fakeTaskId, "test", 3L);
        Assertions.assertThat((Object)((MemoryPool)notifiedPool.get())).isEqualTo((Object)userPool);
        Assertions.assertThat((long)notifiedBytes.get()).isEqualTo(3L);
    }

    @Test
    void testMemoryFutureCancellation() {
        MemoryPool userPool = new MemoryPool(DataSize.of((long)100L, (DataSize.Unit)DataSize.Unit.BYTE));
        TaskId reserveTaskId = new TaskId(new StageId(new QueryId("reserve"), 0), 0, 0);
        Assertions.assertThat((Future)userPool.reserve(reserveTaskId, "reserve", 10L)).isDone();
        ListenableFuture future = userPool.reserve(this.fakeTaskId, "test", 95L);
        Assertions.assertThat((boolean)future.isDone()).isFalse();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> future.cancel(true)).isInstanceOf(UnsupportedOperationException.class)).hasMessage("cancellation is not supported");
        Assertions.assertThat((boolean)future.isDone()).isFalse();
        userPool.free(reserveTaskId, "reserve", 10L);
        Assertions.assertThat((boolean)future.isDone()).isTrue();
    }

    @Test
    void testBlockingOnRevocableMemoryFreeUser() {
        MemoryPool userPool = new MemoryPool(TEN_MEGABYTES);
        RevocableMemoryDriver revocableMemoryDriver = this.createRevocableMemoryDriver(userPool, ONE_BYTE, 10L);
        Assertions.assertThat((boolean)userPool.tryReserve(this.fakeTaskId, "test", TEN_MEGABYTES_WITHOUT_TWO_BYTES.toBytes())).isTrue();
        Assertions.assertThat((long)TestMemoryPools.runDriverUntilBlocked(revocableMemoryDriver.driver(), TestMemoryPools.waitingForRevocableMemory())).isEqualTo(2L);
        ((AbstractBooleanAssert)Assertions.assertThat((userPool.getFreeBytes() <= 0L ? 1 : 0) != 0).describedAs(String.format("Expected empty pool but got [%d]", userPool.getFreeBytes()), new Object[0])).isTrue();
        userPool.free(this.fakeTaskId, "test", 5L);
        Assertions.assertThat((long)TestMemoryPools.runDriverUntilBlocked(revocableMemoryDriver.driver(), TestMemoryPools.waitingForRevocableMemory())).isEqualTo(5L);
        ((AbstractBooleanAssert)Assertions.assertThat((userPool.getFreeBytes() <= 0L ? 1 : 0) != 0).describedAs(String.format("Expected empty pool but got [%d]", userPool.getFreeBytes()), new Object[0])).isTrue();
        userPool.free(this.fakeTaskId, "test", 3L);
        TestMemoryPools.assertDriverProgress(revocableMemoryDriver.driver(), TestMemoryPools.waitingForRevocableMemory());
        Assertions.assertThat((long)userPool.getFreeBytes()).isEqualTo(10L);
    }

    @Test
    void testBlockingOnRevocableMemoryFreeViaRevoke() {
        MemoryPool userPool = new MemoryPool(TEN_MEGABYTES);
        RevocableMemoryDriver revocableMemoryDriver = this.createRevocableMemoryDriver(userPool, ONE_BYTE, 5L);
        Assertions.assertThat((boolean)userPool.tryReserve(this.fakeTaskId, "test", TEN_MEGABYTES_WITHOUT_TWO_BYTES.toBytes())).isTrue();
        Assertions.assertThat((long)TestMemoryPools.runDriverUntilBlocked(revocableMemoryDriver.driver(), TestMemoryPools.waitingForRevocableMemory())).isEqualTo(2L);
        revocableMemoryDriver.operator().getOperatorContext().requestMemoryRevoking();
        Assertions.assertThat((long)TestMemoryPools.runDriverUntilBlocked(revocableMemoryDriver.driver(), TestMemoryPools.waitingForRevocableMemory())).isEqualTo(2L);
        revocableMemoryDriver.operator().getOperatorContext().requestMemoryRevoking();
        TestMemoryPools.assertDriverProgress(revocableMemoryDriver.driver(), TestMemoryPools.waitingForRevocableMemory());
        Assertions.assertThat((long)userPool.getFreeBytes()).isEqualTo(2L);
    }

    @Test
    void testTaggedAllocations() {
        TaskId testTask = new TaskId(new StageId(new QueryId("test_query"), 0), 0, 0);
        MemoryPool testPool = new MemoryPool(DataSize.ofBytes((long)1000L));
        testPool.reserve(testTask, "test_tag", 10L);
        Map allocations = (Map)testPool.getTaggedMemoryAllocations().get(new QueryId("test_query"));
        Assertions.assertThat((Map)allocations).isEqualTo((Object)ImmutableMap.of((Object)"test_tag", (Object)10L));
        testPool.free(testTask, "test_tag", 5L);
        Assertions.assertThat((Map)allocations).isEqualTo((Object)ImmutableMap.of((Object)"test_tag", (Object)5L));
        testPool.reserve(testTask, "test_tag2", 20L);
        Assertions.assertThat((Map)allocations).isEqualTo((Object)ImmutableMap.of((Object)"test_tag", (Object)5L, (Object)"test_tag2", (Object)20L));
        testPool.free(testTask, "test_tag", 5L);
        Assertions.assertThat((Map)allocations).isEqualTo((Object)ImmutableMap.of((Object)"test_tag2", (Object)20L));
        testPool.free(testTask, "test_tag2", 20L);
        Assertions.assertThat((int)testPool.getTaggedMemoryAllocations().size()).isEqualTo(0);
    }

    @Test
    void testPerTaskAllocations() {
        QueryId query1 = new QueryId("test_query1");
        TaskId q1task1 = new TaskId(new StageId(query1, 0), 0, 0);
        TaskId q1task2 = new TaskId(new StageId(query1, 0), 1, 0);
        QueryId query2 = new QueryId("test_query2");
        TaskId q2task1 = new TaskId(new StageId(query2, 0), 0, 0);
        MemoryPool testPool = new MemoryPool(DataSize.ofBytes((long)1000L));
        testPool.reserve(q1task1, "tag", 10L);
        Assertions.assertThat(testPool.getQueryMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query1)).isEqualTo(10L);
        Assertions.assertThat(testPool.getTaskMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task1)).isEqualTo(10L);
        testPool.reserve(q1task2, "tag", 7L);
        Assertions.assertThat(testPool.getQueryMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query1)).isEqualTo(17L);
        Assertions.assertThat(testPool.getTaskMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task1)).isEqualTo(10L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task2)).isEqualTo(7L);
        testPool.reserve(q2task1, "tag", 9L);
        Assertions.assertThat(testPool.getQueryMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query1)).isEqualTo(17L);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task1)).isEqualTo(10L);
        Assertions.assertThat(testPool.getTaskMemoryReservations().keySet()).hasSize(3);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q2task1)).isEqualTo(9L);
        testPool.reserve(q1task1, "tag", 3L);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query1)).isEqualTo(20L);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskMemoryReservations().keySet()).hasSize(3);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task1)).isEqualTo(13L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q2task1)).isEqualTo(9L);
        testPool.free(q1task1, "tag", 5L);
        Assertions.assertThat(testPool.getQueryMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query1)).isEqualTo(15L);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskMemoryReservations().keySet()).hasSize(3);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task1)).isEqualTo(8L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q2task1)).isEqualTo(9L);
        Assertions.assertThatThrownBy(() -> testPool.free(q1task1, "tag", 9L)).hasMessage("tried to free more memory than is reserved by task");
        Assertions.assertThat(testPool.getQueryMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query1)).isEqualTo(15L);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskMemoryReservations().keySet()).hasSize(3);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task1)).isEqualTo(8L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q2task1)).isEqualTo(9L);
        testPool.free(q1task1, "tag", 8L);
        Assertions.assertThat(testPool.getQueryMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query1)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task1)).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q2task1)).isEqualTo(9L);
        testPool.free(q1task2, "tag", 7L);
        Assertions.assertThat(testPool.getQueryMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query1)).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getQueryMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task1)).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q1task2)).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getTaskMemoryReservation(q2task1)).isEqualTo(9L);
    }

    @Test
    void testGlobalAllocations() {
        MemoryPool testPool = new MemoryPool(DataSize.ofBytes((long)1000L));
        Assertions.assertThat((boolean)testPool.tryReserveConnectorMemory(999L)).isTrue();
        Assertions.assertThat((boolean)testPool.tryReserveConnectorMemory(2L)).isFalse();
        Assertions.assertThat((long)testPool.getReservedBytes()).isEqualTo(999L);
        Assertions.assertThat((long)testPool.getConnectorsReservedBytes()).isEqualTo(999L);
        Assertions.assertThat((long)testPool.getReservedRevocableBytes()).isEqualTo(0L);
        Assertions.assertThat((Map)testPool.getTaskMemoryReservations()).isEmpty();
        Assertions.assertThat((Map)testPool.getQueryMemoryReservations()).isEmpty();
        Assertions.assertThat((Map)testPool.getTaggedMemoryAllocations()).isEmpty();
        testPool.freeConnectorMemory(999L);
        Assertions.assertThat((long)testPool.getReservedBytes()).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getConnectorsReservedBytes()).isEqualTo(0L);
    }

    @Test
    void testGlobalRevocableAllocations() {
        MemoryPool testPool = new MemoryPool(DataSize.ofBytes((long)1000L));
        Assertions.assertThat((boolean)testPool.tryReserveRevocable(999L)).isTrue();
        Assertions.assertThat((boolean)testPool.tryReserveRevocable(2L)).isFalse();
        Assertions.assertThat((long)testPool.getReservedBytes()).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getReservedRevocableBytes()).isEqualTo(999L);
        Assertions.assertThat((Map)testPool.getTaskMemoryReservations()).isEmpty();
        Assertions.assertThat((Map)testPool.getQueryMemoryReservations()).isEmpty();
        Assertions.assertThat((Map)testPool.getTaggedMemoryAllocations()).isEmpty();
        QueryId query = new QueryId("test_query1");
        TaskId task = new TaskId(new StageId(query, 0), 0, 0);
        ListenableFuture memoryFuture = testPool.reserve(task, "tag", 2L);
        Assertions.assertThat((Future)memoryFuture).isNotDone();
        testPool.freeRevocable(999L);
        Assertions.assertThat((Future)memoryFuture).isDone();
        Assertions.assertThat((long)testPool.getReservedBytes()).isEqualTo(2L);
        Assertions.assertThat((long)testPool.getReservedRevocableBytes()).isEqualTo(0L);
    }

    @Test
    void testPerTaskRevocableAllocations() {
        QueryId query1 = new QueryId("test_query1");
        TaskId q1task1 = new TaskId(new StageId(query1, 0), 0, 0);
        TaskId q1task2 = new TaskId(new StageId(query1, 0), 1, 0);
        QueryId query2 = new QueryId("test_query2");
        TaskId q2task1 = new TaskId(new StageId(query2, 0), 0, 0);
        MemoryPool testPool = new MemoryPool(DataSize.ofBytes((long)1000L));
        testPool.reserveRevocable(q1task1, 10L);
        Assertions.assertThat(testPool.getQueryRevocableMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query1)).isEqualTo(10L);
        Assertions.assertThat(testPool.getTaskRevocableMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task1)).isEqualTo(10L);
        testPool.reserveRevocable(q1task2, 7L);
        Assertions.assertThat(testPool.getQueryRevocableMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query1)).isEqualTo(17L);
        Assertions.assertThat(testPool.getTaskRevocableMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task1)).isEqualTo(10L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task2)).isEqualTo(7L);
        testPool.reserveRevocable(q2task1, 9L);
        Assertions.assertThat(testPool.getQueryRevocableMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query1)).isEqualTo(17L);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task1)).isEqualTo(10L);
        Assertions.assertThat(testPool.getTaskRevocableMemoryReservations().keySet()).hasSize(3);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q2task1)).isEqualTo(9L);
        testPool.reserveRevocable(q1task1, 3L);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query1)).isEqualTo(20L);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskRevocableMemoryReservations().keySet()).hasSize(3);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task1)).isEqualTo(13L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q2task1)).isEqualTo(9L);
        testPool.freeRevocable(q1task1, 5L);
        Assertions.assertThat(testPool.getQueryRevocableMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query1)).isEqualTo(15L);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskRevocableMemoryReservations().keySet()).hasSize(3);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task1)).isEqualTo(8L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q2task1)).isEqualTo(9L);
        Assertions.assertThatThrownBy(() -> testPool.freeRevocable(q1task1, 9L)).hasMessage("tried to free more revocable memory than is reserved by task");
        Assertions.assertThat(testPool.getQueryRevocableMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query1)).isEqualTo(15L);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskRevocableMemoryReservations().keySet()).hasSize(3);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task1)).isEqualTo(8L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q2task1)).isEqualTo(9L);
        testPool.freeRevocable(q1task1, 8L);
        Assertions.assertThat(testPool.getQueryRevocableMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query1)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskRevocableMemoryReservations().keySet()).hasSize(2);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task1)).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task2)).isEqualTo(7L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q2task1)).isEqualTo(9L);
        testPool.freeRevocable(q1task2, 7L);
        Assertions.assertThat(testPool.getQueryRevocableMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query1)).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getQueryRevocableMemoryReservation(query2)).isEqualTo(9L);
        Assertions.assertThat(testPool.getTaskRevocableMemoryReservations().keySet()).hasSize(1);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task1)).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q1task2)).isEqualTo(0L);
        Assertions.assertThat((long)testPool.getTaskRevocableMemoryReservation(q2task1)).isEqualTo(9L);
    }

    private static long runDriverUntilBlocked(Driver driver, Predicate<OperatorContext> reason) {
        long iterationsCount = 0L;
        while (!TestMemoryPools.isBlockedFor(driver, reason)) {
            driver.processForNumberOfIterations(1);
            ++iterationsCount;
        }
        Assertions.assertThat((boolean)driver.isFinished()).isFalse();
        return iterationsCount;
    }

    private static void assertDriverProgress(Driver driver, Predicate<OperatorContext> reason) {
        do {
            Assertions.assertThat((boolean)TestMemoryPools.isBlockedFor(driver, reason)).isFalse();
            Assertions.assertThat((Future)driver.processUntilBlocked()).isDone();
        } while (!driver.isFinished());
    }

    private static Predicate<OperatorContext> waitingForRevocableMemory() {
        return operatorContext -> !operatorContext.isWaitingForRevocableMemory().isDone() && !operatorContext.isMemoryRevokingRequested();
    }

    private static boolean isBlockedFor(Driver driver, Predicate<OperatorContext> reason) {
        for (OperatorContext operatorContext : driver.getDriverContext().getOperatorContexts()) {
            if (!reason.test(operatorContext)) continue;
            return true;
        }
        return false;
    }

    private static class RevocableMemoryOperator
    implements Operator {
        private final DataSize reservedPerPage;
        private final long numberOfPages;
        private final OperatorContext operatorContext;
        private long producedPagesCount;
        private final LocalMemoryContext revocableMemoryContext;

        public RevocableMemoryOperator(OperatorContext operatorContext, DataSize reservedPerPage, long numberOfPages) {
            this.operatorContext = operatorContext;
            this.reservedPerPage = reservedPerPage;
            this.numberOfPages = numberOfPages;
            this.revocableMemoryContext = operatorContext.localRevocableMemoryContext();
        }

        public ListenableFuture<Void> startMemoryRevoke() {
            return Futures.immediateVoidFuture();
        }

        public void finishMemoryRevoke() {
            this.revocableMemoryContext.setBytes(0L);
        }

        public OperatorContext getOperatorContext() {
            return this.operatorContext;
        }

        public void finish() {
            this.revocableMemoryContext.setBytes(0L);
        }

        public boolean isFinished() {
            return this.producedPagesCount >= this.numberOfPages;
        }

        public boolean needsInput() {
            return false;
        }

        public void addInput(Page page) {
            throw new UnsupportedOperationException();
        }

        public Page getOutput() {
            this.revocableMemoryContext.setBytes(this.revocableMemoryContext.getBytes() + this.reservedPerPage.toBytes());
            ++this.producedPagesCount;
            if (this.producedPagesCount == this.numberOfPages) {
                this.finish();
            }
            return new Page(10);
        }
    }

    private record RevocableMemoryDriver(Driver driver, RevocableMemoryOperator operator) {
    }
}

