/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.testing.TestingTicker;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.client.NodeVersion;
import io.trino.connector.CatalogHandle;
import io.trino.cost.StatsAndCosts;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTaskFactory;
import io.trino.execution.SqlStage;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.TaskState;
import io.trino.execution.TestingRemoteTaskFactory;
import io.trino.execution.scheduler.ConstantPartitionMemoryEstimator;
import io.trino.execution.scheduler.FaultTolerantStageScheduler;
import io.trino.execution.scheduler.FixedCountNodeAllocatorService;
import io.trino.execution.scheduler.NodeAllocator;
import io.trino.execution.scheduler.NodeRequirements;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSelectorFactory;
import io.trino.execution.scheduler.PartitionMemoryEstimator;
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.TaskDescriptorStorage;
import io.trino.execution.scheduler.TaskExecutionStats;
import io.trino.execution.scheduler.TaskLifecycleListener;
import io.trino.execution.scheduler.TaskSourceFactory;
import io.trino.execution.scheduler.TestingExchange;
import io.trino.execution.scheduler.TestingNodeSelectorFactory;
import io.trino.execution.scheduler.TestingTaskLifecycleListener;
import io.trino.execution.scheduler.TestingTaskSourceFactory;
import io.trino.failuredetector.FailureDetector;
import io.trino.failuredetector.NoOpFailureDetector;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.operator.RetryPolicy;
import io.trino.server.DynamicFilterService;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.VarcharType;
import io.trino.sql.PlannerContext;
import io.trino.sql.planner.Partitioning;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PartitioningScheme;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.TestingPlannerContext;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingMetadata;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingSplit;
import io.trino.util.FinalizerService;
import java.net.URI;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestFaultTolerantStageScheduler {
    private static final QueryId QUERY_ID = new QueryId("query");
    private static final Session SESSION = TestingSession.testSessionBuilder().setQueryId(QUERY_ID).build();
    private static final StageId STAGE_ID = new StageId(QUERY_ID, 0);
    private static final PlanFragmentId FRAGMENT_ID = new PlanFragmentId("0");
    private static final PlanFragmentId SOURCE_FRAGMENT_ID_1 = new PlanFragmentId("1");
    private static final PlanFragmentId SOURCE_FRAGMENT_ID_2 = new PlanFragmentId("2");
    private static final PlanNodeId TABLE_SCAN_NODE_ID = new PlanNodeId("table_scan_id");
    private static final InternalNode NODE_1 = new InternalNode("node-1", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, false);
    private static final InternalNode NODE_2 = new InternalNode("node-2", URI.create("local://127.0.0.1:8081"), NodeVersion.UNKNOWN, false);
    private static final InternalNode NODE_3 = new InternalNode("node-3", URI.create("local://127.0.0.1:8082"), NodeVersion.UNKNOWN, false);
    private static final PlannerContext PLANNER_CONTEXT = TestingPlannerContext.plannerContextBuilder().build();
    private FinalizerService finalizerService;
    private NodeTaskMap nodeTaskMap;
    private FixedCountNodeAllocatorService nodeAllocatorService;
    private TestingTicker ticker;
    private TestFutureCompletor futureCompletor;

    @BeforeClass
    public void beforeClass() {
        this.finalizerService = new FinalizerService();
        this.finalizerService.start();
        this.nodeTaskMap = new NodeTaskMap(this.finalizerService);
        this.ticker = new TestingTicker();
        this.futureCompletor = new TestFutureCompletor((Ticker)this.ticker);
    }

    @AfterClass(alwaysRun=true)
    public void afterClass() {
        this.nodeTaskMap = null;
        if (this.finalizerService != null) {
            this.finalizerService.destroy();
            this.finalizerService = null;
        }
    }

    private void setupNodeAllocatorService(TestingNodeSelectorFactory.TestingNodeSupplier nodeSupplier) {
        this.shutdownNodeAllocatorService();
        this.nodeAllocatorService = new FixedCountNodeAllocatorService(new NodeScheduler((NodeSelectorFactory)new TestingNodeSelectorFactory(NODE_1, nodeSupplier)));
    }

    @AfterMethod(alwaysRun=true)
    public void shutdownNodeAllocatorService() {
        if (this.nodeAllocatorService != null) {
            this.nodeAllocatorService.stop();
        }
        this.nodeAllocatorService = null;
    }

    @Test
    public void testHappyPath() throws Exception {
        TestingRemoteTaskFactory remoteTaskFactory = new TestingRemoteTaskFactory();
        TestingTaskSourceFactory taskSourceFactory = TestFaultTolerantStageScheduler.createTaskSourceFactory(5, 2);
        TestingNodeSelectorFactory.TestingNodeSupplier nodeSupplier = TestingNodeSelectorFactory.TestingNodeSupplier.create((Map<InternalNode, List<CatalogHandle>>)ImmutableMap.of((Object)NODE_1, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_2, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_3, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE)));
        this.setupNodeAllocatorService(nodeSupplier);
        TestingExchange sinkExchange = new TestingExchange(false);
        TestingExchange sourceExchange1 = new TestingExchange(false);
        TestingExchange sourceExchange2 = new TestingExchange(false);
        try (NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION, 1);){
            FaultTolerantStageScheduler scheduler = this.createFaultTolerantTaskScheduler(remoteTaskFactory, taskSourceFactory, nodeAllocator, TaskLifecycleListener.NO_OP, Optional.of(sinkExchange), (Map<PlanFragmentId, Exchange>)ImmutableMap.of((Object)SOURCE_FRAGMENT_ID_1, (Object)sourceExchange1, (Object)SOURCE_FRAGMENT_ID_2, (Object)sourceExchange2), 2, 1);
            ListenableFuture blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            sourceExchange1.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assert.assertFalse((boolean)scheduler.isBlocked().isDone());
            sourceExchange2.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assert.assertFalse((boolean)sinkExchange.isNoMoreSinks());
            Map<TaskId, TestingRemoteTaskFactory.TestingRemoteTask> tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).hasSize(3);
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(0, 0));
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(1, 0));
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(2, 0));
            TestingRemoteTaskFactory.TestingRemoteTask task = tasks.get(TestFaultTolerantStageScheduler.getTaskId(0, 0));
            task.fail(new RuntimeException("some failure"));
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            this.moveTime(10, TimeUnit.SECONDS);
            scheduler.schedule();
            tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).hasSize(4);
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(3, 0));
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(1, 0));
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).finish();
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            Assertions.assertThat(sinkExchange.getFinishedSinkHandles()).contains((Object[])new TestingExchange.TestingExchangeSinkHandle[]{new TestingExchange.TestingExchangeSinkHandle(1)});
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).hasSize(5);
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(0, 1));
            tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(3, 0));
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(3, 0)).finish();
            Assertions.assertThat(sinkExchange.getFinishedSinkHandles()).contains((Object[])new TestingExchange.TestingExchangeSinkHandle[]{new TestingExchange.TestingExchangeSinkHandle(1), new TestingExchange.TestingExchangeSinkHandle(3)});
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).hasSize(6);
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(4, 0));
            Assert.assertFalse((boolean)scheduler.isFinished());
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(4, 0));
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).finish();
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).finish();
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(4, 0)).finish();
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            Assertions.assertThat(sinkExchange.getFinishedSinkHandles()).contains((Object[])new TestingExchange.TestingExchangeSinkHandle[]{new TestingExchange.TestingExchangeSinkHandle(0), new TestingExchange.TestingExchangeSinkHandle(1), new TestingExchange.TestingExchangeSinkHandle(2), new TestingExchange.TestingExchangeSinkHandle(3), new TestingExchange.TestingExchangeSinkHandle(4)});
            Assert.assertTrue((boolean)scheduler.isFinished());
        }
    }

    @Test
    public void testTasksWaitingForNodes() throws Exception {
        TestingRemoteTaskFactory remoteTaskFactory = new TestingRemoteTaskFactory();
        ImmutableList splits = ImmutableList.of((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestingSplit(false, (List)ImmutableList.of((Object)NODE_1.getHostAndPort()))), (Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestingSplit(false, (List)ImmutableList.of((Object)NODE_1.getHostAndPort()))), (Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestingSplit(false, (List)ImmutableList.of((Object)NODE_1.getHostAndPort()))), (Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestingSplit(false, (List)ImmutableList.of((Object)NODE_2.getHostAndPort()))), (Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestingSplit(false, (List)ImmutableList.of((Object)NODE_1.getHostAndPort()))), (Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestingSplit(false, (List)ImmutableList.of((Object)NODE_3.getHostAndPort()))));
        TestingTaskSourceFactory taskSourceFactory = new TestingTaskSourceFactory(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), (List<Split>)splits, 2);
        TestingNodeSelectorFactory.TestingNodeSupplier nodeSupplier = TestingNodeSelectorFactory.TestingNodeSupplier.create((Map<InternalNode, List<CatalogHandle>>)ImmutableMap.of((Object)NODE_1, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_2, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_3, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE)));
        this.setupNodeAllocatorService(nodeSupplier);
        TestingExchange sinkExchange = new TestingExchange(false);
        TestingExchange sourceExchange1 = new TestingExchange(false);
        TestingExchange sourceExchange2 = new TestingExchange(false);
        try (NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION, 1);){
            FaultTolerantStageScheduler scheduler = this.createFaultTolerantTaskScheduler(remoteTaskFactory, taskSourceFactory, nodeAllocator, TaskLifecycleListener.NO_OP, Optional.of(sinkExchange), (Map<PlanFragmentId, Exchange>)ImmutableMap.of((Object)SOURCE_FRAGMENT_ID_1, (Object)sourceExchange1, (Object)SOURCE_FRAGMENT_ID_2, (Object)sourceExchange2), 2, 3);
            sourceExchange1.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            sourceExchange2.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            scheduler.schedule();
            TestFaultTolerantStageScheduler.assertBlocked(scheduler.isBlocked());
            Map<TaskId, TestingRemoteTaskFactory.TestingRemoteTask> tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).hasSize(2);
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(0, 0));
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(3, 0));
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(3, 0)).finish();
            scheduler.schedule();
            TestFaultTolerantStageScheduler.assertBlocked(scheduler.isBlocked());
            tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).hasSize(2);
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(0, 0));
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(3, 0));
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).finish();
            scheduler.schedule();
            TestFaultTolerantStageScheduler.assertBlocked(scheduler.isBlocked());
            tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).hasSize(4);
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(0, 0));
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(1, 0));
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(3, 0));
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(5, 0));
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).finish();
            scheduler.schedule();
            tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(2, 0));
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).finish();
            scheduler.schedule();
            tasks = remoteTaskFactory.getTasks();
            Assertions.assertThat(tasks).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(4, 0));
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(4, 0)).finish();
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(3, 0)).finish();
            tasks.get(TestFaultTolerantStageScheduler.getTaskId(5, 0)).finish();
            scheduler.schedule();
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            Assert.assertTrue((boolean)scheduler.isFinished());
        }
    }

    @Test
    public void testTaskLifecycleListener() throws Exception {
        TestingRemoteTaskFactory remoteTaskFactory = new TestingRemoteTaskFactory();
        TestingTaskSourceFactory taskSourceFactory = TestFaultTolerantStageScheduler.createTaskSourceFactory(2, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier nodeSupplier = TestingNodeSelectorFactory.TestingNodeSupplier.create((Map<InternalNode, List<CatalogHandle>>)ImmutableMap.of((Object)NODE_1, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_2, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE)));
        this.setupNodeAllocatorService(nodeSupplier);
        TestingTaskLifecycleListener taskLifecycleListener = new TestingTaskLifecycleListener();
        TestingExchange sourceExchange1 = new TestingExchange(false);
        TestingExchange sourceExchange2 = new TestingExchange(false);
        try (NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION, 1);){
            FaultTolerantStageScheduler scheduler = this.createFaultTolerantTaskScheduler(remoteTaskFactory, taskSourceFactory, nodeAllocator, taskLifecycleListener, Optional.empty(), (Map<PlanFragmentId, Exchange>)ImmutableMap.of((Object)SOURCE_FRAGMENT_ID_1, (Object)sourceExchange1, (Object)SOURCE_FRAGMENT_ID_2, (Object)sourceExchange2), 2, 1);
            sourceExchange1.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            sourceExchange2.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            scheduler.schedule();
            TestFaultTolerantStageScheduler.assertBlocked(scheduler.isBlocked());
            Assertions.assertThat((Iterable)taskLifecycleListener.getTasks().get((Object)FRAGMENT_ID)).contains((Object[])new TaskId[]{TestFaultTolerantStageScheduler.getTaskId(0, 0), TestFaultTolerantStageScheduler.getTaskId(1, 0)});
            remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).fail(new RuntimeException("some exception"));
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            this.moveTime(10, TimeUnit.SECONDS);
            scheduler.schedule();
            TestFaultTolerantStageScheduler.assertBlocked(scheduler.isBlocked());
            Assertions.assertThat((Iterable)taskLifecycleListener.getTasks().get((Object)FRAGMENT_ID)).contains((Object[])new TaskId[]{TestFaultTolerantStageScheduler.getTaskId(0, 0), TestFaultTolerantStageScheduler.getTaskId(1, 0), TestFaultTolerantStageScheduler.getTaskId(0, 1)});
        }
    }

    @Test
    public void testTaskFailure() throws Exception {
        TestingRemoteTaskFactory remoteTaskFactory = new TestingRemoteTaskFactory();
        TestingTaskSourceFactory taskSourceFactory = TestFaultTolerantStageScheduler.createTaskSourceFactory(3, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier nodeSupplier = TestingNodeSelectorFactory.TestingNodeSupplier.create((Map<InternalNode, List<CatalogHandle>>)ImmutableMap.of((Object)NODE_1, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_2, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE)));
        this.setupNodeAllocatorService(nodeSupplier);
        TestingExchange sourceExchange1 = new TestingExchange(false);
        TestingExchange sourceExchange2 = new TestingExchange(false);
        try (NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION, 1);){
            FaultTolerantStageScheduler scheduler = this.createFaultTolerantTaskScheduler(remoteTaskFactory, taskSourceFactory, nodeAllocator, TaskLifecycleListener.NO_OP, Optional.empty(), (Map<PlanFragmentId, Exchange>)ImmutableMap.of((Object)SOURCE_FRAGMENT_ID_1, (Object)sourceExchange1, (Object)SOURCE_FRAGMENT_ID_2, (Object)sourceExchange2), 0, 1);
            sourceExchange1.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            sourceExchange2.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            scheduler.schedule();
            ListenableFuture blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            NodeAllocator.NodeLease acquireNode1 = nodeAllocator.acquire(new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), (Set)ImmutableSet.of(), DataSize.of((long)4L, (DataSize.Unit)DataSize.Unit.GIGABYTE)));
            NodeAllocator.NodeLease acquireNode2 = nodeAllocator.acquire(new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), (Set)ImmutableSet.of(), DataSize.of((long)4L, (DataSize.Unit)DataSize.Unit.GIGABYTE)));
            remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).fail(new RuntimeException("some failure"));
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            TestFaultTolerantStageScheduler.assertUnblocked(acquireNode1.getNode());
            TestFaultTolerantStageScheduler.assertUnblocked(acquireNode2.getNode());
            Assertions.assertThatThrownBy(() -> ((FaultTolerantStageScheduler)scheduler).schedule()).hasMessageContaining("some failure");
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            Assert.assertFalse((boolean)scheduler.isFinished());
        }
    }

    @Test
    public void testReportTaskFailure() throws Exception {
        TestingRemoteTaskFactory remoteTaskFactory = new TestingRemoteTaskFactory();
        TestingTaskSourceFactory taskSourceFactory = TestFaultTolerantStageScheduler.createTaskSourceFactory(2, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier nodeSupplier = TestingNodeSelectorFactory.TestingNodeSupplier.create((Map<InternalNode, List<CatalogHandle>>)ImmutableMap.of((Object)NODE_1, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_2, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE)));
        this.setupNodeAllocatorService(nodeSupplier);
        TestingExchange sourceExchange1 = new TestingExchange(false);
        TestingExchange sourceExchange2 = new TestingExchange(false);
        try (NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION, 1);){
            FaultTolerantStageScheduler scheduler = this.createFaultTolerantTaskScheduler(remoteTaskFactory, taskSourceFactory, nodeAllocator, TaskLifecycleListener.NO_OP, Optional.empty(), (Map<PlanFragmentId, Exchange>)ImmutableMap.of((Object)SOURCE_FRAGMENT_ID_1, (Object)sourceExchange1, (Object)SOURCE_FRAGMENT_ID_2, (Object)sourceExchange2), 1, 1);
            sourceExchange1.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            sourceExchange2.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            scheduler.schedule();
            ListenableFuture blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            scheduler.reportTaskFailure(TestFaultTolerantStageScheduler.getTaskId(0, 0), (Throwable)new RuntimeException("some failure"));
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            this.moveTime(10, TimeUnit.SECONDS);
            scheduler.schedule();
            Assertions.assertThat(remoteTaskFactory.getTasks()).containsKey((Object)TestFaultTolerantStageScheduler.getTaskId(0, 1));
            remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).finish();
            remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).finish();
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            Assert.assertTrue((boolean)scheduler.isFinished());
        }
    }

    @Test
    public void testRetryDelay() throws Exception {
        TestingRemoteTaskFactory remoteTaskFactory = new TestingRemoteTaskFactory();
        TestingTaskSourceFactory taskSourceFactory = TestFaultTolerantStageScheduler.createTaskSourceFactory(3, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier nodeSupplier = TestingNodeSelectorFactory.TestingNodeSupplier.create((Map<InternalNode, List<CatalogHandle>>)ImmutableMap.of((Object)NODE_1, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_2, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_3, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE)));
        this.setupNodeAllocatorService(nodeSupplier);
        TestingExchange sourceExchange1 = new TestingExchange(false);
        TestingExchange sourceExchange2 = new TestingExchange(false);
        Session session = TestingSession.testSessionBuilder().setQueryId(QUERY_ID).setSystemProperty("retry_initial_delay", "1s").setSystemProperty("retry_max_delay", "3s").setSystemProperty("retry_delay_scale_factor", "2.0").build();
        try (NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(session, 1);){
            FaultTolerantStageScheduler scheduler = this.createFaultTolerantTaskScheduler(session, remoteTaskFactory, taskSourceFactory, nodeAllocator, TaskLifecycleListener.NO_OP, Optional.empty(), (Map<PlanFragmentId, Exchange>)ImmutableMap.of((Object)SOURCE_FRAGMENT_ID_1, (Object)sourceExchange1, (Object)SOURCE_FRAGMENT_ID_2, (Object)sourceExchange2), 6, 1);
            sourceExchange1.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            sourceExchange2.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            scheduler.schedule();
            ListenableFuture blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(3);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            scheduler.reportTaskFailure(TestFaultTolerantStageScheduler.getTaskId(0, 0), (Throwable)new RuntimeException("some failure"));
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(3);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            this.moveTime(900, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(3);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            this.moveTime(500, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(4);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            scheduler.reportTaskFailure(TestFaultTolerantStageScheduler.getTaskId(0, 1), (Throwable)new RuntimeException("some other failure"));
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(4);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            this.moveTime(1900, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(4);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            this.moveTime(200, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(5);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            scheduler.reportTaskFailure(TestFaultTolerantStageScheduler.getTaskId(0, 2), (Throwable)new RuntimeException("some other failure"));
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(5);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            this.moveTime(2900, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(5);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            this.moveTime(200, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(6);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            scheduler.reportTaskFailure(TestFaultTolerantStageScheduler.getTaskId(1, 0), (Throwable)new RuntimeException("some other failure"));
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(6);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            this.moveTime(2400, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(6);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).finish();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(6);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            this.moveTime(700, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(7);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 1)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).finish();
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(7);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 1)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            scheduler.reportTaskFailure(TestFaultTolerantStageScheduler.getTaskId(1, 1), (Throwable)new RuntimeException("some other failure"));
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(7);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            this.moveTime(900, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(7);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            this.moveTime(200, TimeUnit.MILLISECONDS);
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(8);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 2)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            scheduler.reportTaskFailure(TestFaultTolerantStageScheduler.getTaskId(1, 2), (Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.CLUSTER_OUT_OF_MEMORY, "oom"));
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            scheduler.schedule();
            blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(9);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(0, 3)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 0)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 1)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 2)).getTaskStatus().getState(), (Object)TaskState.FAILED);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(1, 3)).getTaskStatus().getState(), (Object)TaskState.RUNNING);
            Assert.assertEquals((Object)remoteTaskFactory.getTasks().get(TestFaultTolerantStageScheduler.getTaskId(2, 0)).getTaskStatus().getState(), (Object)TaskState.FINISHED);
        }
    }

    @Test
    public void testCancellation() throws Exception {
        this.testCancellation(true);
        this.testCancellation(false);
    }

    private void testCancellation(boolean abort) throws Exception {
        TestingRemoteTaskFactory remoteTaskFactory = new TestingRemoteTaskFactory();
        TestingTaskSourceFactory taskSourceFactory = TestFaultTolerantStageScheduler.createTaskSourceFactory(3, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier nodeSupplier = TestingNodeSelectorFactory.TestingNodeSupplier.create((Map<InternalNode, List<CatalogHandle>>)ImmutableMap.of((Object)NODE_1, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_2, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE)));
        this.setupNodeAllocatorService(nodeSupplier);
        TestingExchange sourceExchange1 = new TestingExchange(false);
        TestingExchange sourceExchange2 = new TestingExchange(false);
        try (NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION, 1);){
            FaultTolerantStageScheduler scheduler = this.createFaultTolerantTaskScheduler(remoteTaskFactory, taskSourceFactory, nodeAllocator, TaskLifecycleListener.NO_OP, Optional.empty(), (Map<PlanFragmentId, Exchange>)ImmutableMap.of((Object)SOURCE_FRAGMENT_ID_1, (Object)sourceExchange1, (Object)SOURCE_FRAGMENT_ID_2, (Object)sourceExchange2), 0, 1);
            sourceExchange1.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            sourceExchange2.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            scheduler.schedule();
            ListenableFuture blocked = scheduler.isBlocked();
            TestFaultTolerantStageScheduler.assertBlocked(blocked);
            NodeAllocator.NodeLease acquireNode1 = nodeAllocator.acquire(new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), (Set)ImmutableSet.of(), DataSize.of((long)4L, (DataSize.Unit)DataSize.Unit.GIGABYTE)));
            NodeAllocator.NodeLease acquireNode2 = nodeAllocator.acquire(new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), (Set)ImmutableSet.of(), DataSize.of((long)4L, (DataSize.Unit)DataSize.Unit.GIGABYTE)));
            if (abort) {
                scheduler.abort();
            } else {
                scheduler.cancel();
            }
            TestFaultTolerantStageScheduler.assertUnblocked(blocked);
            TestFaultTolerantStageScheduler.assertUnblocked(acquireNode1.getNode());
            TestFaultTolerantStageScheduler.assertUnblocked(acquireNode2.getNode());
            scheduler.schedule();
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            Assert.assertFalse((boolean)scheduler.isFinished());
        }
    }

    @Test
    public void testAsyncTaskSource() throws Exception {
        TestingRemoteTaskFactory remoteTaskFactory = new TestingRemoteTaskFactory();
        SettableFuture splitsFuture = SettableFuture.create();
        TestingTaskSourceFactory taskSourceFactory = new TestingTaskSourceFactory(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), (ListenableFuture<List<Split>>)splitsFuture, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier nodeSupplier = TestingNodeSelectorFactory.TestingNodeSupplier.create((Map<InternalNode, List<CatalogHandle>>)ImmutableMap.of((Object)NODE_1, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE), (Object)NODE_2, (Object)ImmutableList.of((Object)TestingHandles.TEST_CATALOG_HANDLE)));
        this.setupNodeAllocatorService(nodeSupplier);
        TestingExchange sourceExchange1 = new TestingExchange(false);
        TestingExchange sourceExchange2 = new TestingExchange(false);
        try (NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(SESSION, 1);){
            FaultTolerantStageScheduler scheduler = this.createFaultTolerantTaskScheduler(remoteTaskFactory, taskSourceFactory, nodeAllocator, TaskLifecycleListener.NO_OP, Optional.empty(), (Map<PlanFragmentId, Exchange>)ImmutableMap.of((Object)SOURCE_FRAGMENT_ID_1, (Object)sourceExchange1, (Object)SOURCE_FRAGMENT_ID_2, (Object)sourceExchange2), 2, 1);
            sourceExchange1.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            sourceExchange2.setSourceHandles((List<ExchangeSourceHandle>)ImmutableList.of((Object)new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            scheduler.schedule();
            TestFaultTolerantStageScheduler.assertBlocked(scheduler.isBlocked());
            splitsFuture.set(TestFaultTolerantStageScheduler.createSplits(2));
            TestFaultTolerantStageScheduler.assertUnblocked(scheduler.isBlocked());
            scheduler.schedule();
            Assertions.assertThat(remoteTaskFactory.getTasks()).hasSize(2);
            remoteTaskFactory.getTasks().values().forEach(task -> {
                Assertions.assertThat((Iterable)task.getSplits().values()).hasSize(2);
                task.finish();
            });
            Assertions.assertThat((boolean)scheduler.isFinished()).isTrue();
        }
    }

    private FaultTolerantStageScheduler createFaultTolerantTaskScheduler(RemoteTaskFactory remoteTaskFactory, TaskSourceFactory taskSourceFactory, NodeAllocator nodeAllocator, TaskLifecycleListener taskLifecycleListener, Optional<Exchange> sinkExchange, Map<PlanFragmentId, Exchange> sourceExchanges, int retryAttempts, int maxTasksWaitingForNodePerStage) {
        return this.createFaultTolerantTaskScheduler(SESSION, remoteTaskFactory, taskSourceFactory, nodeAllocator, taskLifecycleListener, sinkExchange, sourceExchanges, retryAttempts, maxTasksWaitingForNodePerStage);
    }

    private FaultTolerantStageScheduler createFaultTolerantTaskScheduler(Session session, RemoteTaskFactory remoteTaskFactory, TaskSourceFactory taskSourceFactory, NodeAllocator nodeAllocator, TaskLifecycleListener taskLifecycleListener, Optional<Exchange> sinkExchange, Map<PlanFragmentId, Exchange> sourceExchanges, int retryAttempts, int maxTasksWaitingForNodePerStage) {
        TaskDescriptorStorage taskDescriptorStorage = new TaskDescriptorStorage(DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        taskDescriptorStorage.initialize(SESSION.getQueryId());
        return new FaultTolerantStageScheduler(session, this.createSqlStage(remoteTaskFactory), (FailureDetector)new NoOpFailureDetector(), taskSourceFactory, nodeAllocator, taskDescriptorStorage, (PartitionMemoryEstimator)new ConstantPartitionMemoryEstimator(), new TaskExecutionStats(), taskLifecycleListener, (FaultTolerantStageScheduler.DelayedFutureCompletor)this.futureCompletor, (Ticker)this.ticker, sinkExchange, Optional.empty(), sourceExchanges, Optional.empty(), Optional.empty(), new AtomicInteger(retryAttempts), retryAttempts, maxTasksWaitingForNodePerStage, new DynamicFilterService(PLANNER_CONTEXT.getMetadata(), PLANNER_CONTEXT.getFunctionManager(), PLANNER_CONTEXT.getTypeOperators(), new DynamicFilterConfig()));
    }

    private SqlStage createSqlStage(RemoteTaskFactory remoteTaskFactory) {
        PlanFragment fragment = this.createPlanFragment();
        return SqlStage.createSqlStage((StageId)STAGE_ID, (PlanFragment)fragment, (Map)ImmutableMap.of(), (RemoteTaskFactory)remoteTaskFactory, (Session)SESSION, (boolean)false, (NodeTaskMap)this.nodeTaskMap, (Executor)MoreExecutors.directExecutor(), (SplitSchedulerStats)new SplitSchedulerStats());
    }

    private PlanFragment createPlanFragment() {
        Symbol probeColumnSymbol = new Symbol("probe_column");
        Symbol buildColumnSymbol = new Symbol("build_column");
        TableScanNode tableScan = new TableScanNode(TABLE_SCAN_NODE_ID, TestingHandles.TEST_TABLE_HANDLE, (List)ImmutableList.of((Object)probeColumnSymbol), (Map)ImmutableMap.of((Object)probeColumnSymbol, (Object)new TestingMetadata.TestingColumnHandle("column")), TupleDomain.none(), Optional.empty(), false, Optional.empty());
        RemoteSourceNode remoteSource = new RemoteSourceNode(new PlanNodeId("remote_source_id"), (List)ImmutableList.of((Object)SOURCE_FRAGMENT_ID_1, (Object)SOURCE_FRAGMENT_ID_2), (List)ImmutableList.of((Object)buildColumnSymbol), Optional.empty(), ExchangeNode.Type.REPLICATE, RetryPolicy.TASK);
        return new PlanFragment(FRAGMENT_ID, (PlanNode)new JoinNode(new PlanNodeId("join_id"), JoinNode.Type.INNER, (PlanNode)tableScan, (PlanNode)remoteSource, (List)ImmutableList.of(), tableScan.getOutputSymbols(), remoteSource.getOutputSymbols(), false, Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(JoinNode.DistributionType.REPLICATED), Optional.empty(), (Map)ImmutableMap.of(), Optional.empty()), (Map)ImmutableMap.of((Object)probeColumnSymbol, (Object)VarcharType.VARCHAR, (Object)buildColumnSymbol, (Object)VarcharType.VARCHAR), SystemPartitioningHandle.SOURCE_DISTRIBUTION, (List)ImmutableList.of((Object)TABLE_SCAN_NODE_ID), new PartitioningScheme(Partitioning.create((PartitioningHandle)SystemPartitioningHandle.SINGLE_DISTRIBUTION, (List)ImmutableList.of()), (List)ImmutableList.of((Object)probeColumnSymbol, (Object)buildColumnSymbol)), StatsAndCosts.empty(), (List)ImmutableList.of(), Optional.empty());
    }

    private static TestingTaskSourceFactory createTaskSourceFactory(int splitCount, int taskPerBatch) {
        return new TestingTaskSourceFactory(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), TestFaultTolerantStageScheduler.createSplits(splitCount), taskPerBatch);
    }

    private static List<Split> createSplits(int count) {
        return ImmutableList.copyOf((Iterable)Iterables.limit((Iterable)Iterables.cycle((Object[])new Split[]{new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)TestingSplit.createRemoteSplit())}), (int)count));
    }

    private static TaskId getTaskId(int partitionId, int attemptId) {
        return new TaskId(STAGE_ID, partitionId, attemptId);
    }

    private static void assertBlocked(ListenableFuture<?> blocked) {
        Assert.assertFalse((boolean)blocked.isDone());
    }

    private static void assertUnblocked(ListenableFuture<?> blocked) {
        Assert.assertTrue((boolean)blocked.isDone());
    }

    private void moveTime(int delta, TimeUnit unit) {
        this.ticker.increment((long)delta, unit);
        this.futureCompletor.trigger();
    }

    private static class TestFutureCompletor
    implements FaultTolerantStageScheduler.DelayedFutureCompletor {
        private final Stopwatch stopwatch;
        private final Set<Entry> entries = Sets.newConcurrentHashSet();

        private TestFutureCompletor(Ticker ticker) {
            this.stopwatch = Stopwatch.createStarted((Ticker)ticker);
        }

        public void completeFuture(SettableFuture<Void> future, Duration delay) {
            this.entries.add(new Entry(future, this.stopwatch.elapsed().plus(delay)));
        }

        public void trigger() {
            Duration now = this.stopwatch.elapsed();
            Iterator<Entry> iterator = this.entries.iterator();
            while (iterator.hasNext()) {
                Entry entry = iterator.next();
                if (entry.completionTime.compareTo(now) > 0) continue;
                entry.future.set(null);
                iterator.remove();
            }
        }

        private static class Entry {
            private final SettableFuture<Void> future;
            private final Duration completionTime;

            public Entry(SettableFuture<Void> future, Duration completionTime) {
                this.future = future;
                this.completionTime = completionTime;
            }
        }
    }
}

