/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.concurrent.Threads;
import io.airlift.tracing.Tracing;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.client.NodeVersion;
import io.trino.cost.StatsAndCosts;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.MockRemoteTaskFactory;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.PartitionedSplitsInfo;
import io.trino.execution.RemoteTask;
import io.trino.execution.RemoteTaskFactory;
import io.trino.execution.SqlStage;
import io.trino.execution.StageId;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.TableInfo;
import io.trino.execution.scheduler.DynamicSplitPlacementPolicy;
import io.trino.execution.scheduler.MultiSourcePartitionedScheduler;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.NodeSelectorFactory;
import io.trino.execution.scheduler.PartitionedPipelinedOutputBufferManager;
import io.trino.execution.scheduler.PipelinedStageExecution;
import io.trino.execution.scheduler.ScheduleResult;
import io.trino.execution.scheduler.SplitPlacementPolicy;
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.StageExecution;
import io.trino.execution.scheduler.StageScheduler;
import io.trino.execution.scheduler.TaskLifecycleListener;
import io.trino.execution.scheduler.UniformNodeSelectorFactory;
import io.trino.failuredetector.FailureDetector;
import io.trino.failuredetector.NoOpFailureDetector;
import io.trino.metadata.FunctionManager;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Metadata;
import io.trino.metadata.MetadataManager;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.TableHandle;
import io.trino.operator.RetryPolicy;
import io.trino.server.DynamicFilterService;
import io.trino.spi.QueryId;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.VarcharType;
import io.trino.split.ConnectorAwareSplitSource;
import io.trino.split.SplitSource;
import io.trino.sql.DynamicFilters;
import io.trino.sql.ir.Expression;
import io.trino.sql.planner.Partitioning;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PartitioningScheme;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.PlanNodeIdAllocator;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SymbolAllocator;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.JoinType;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingMetadata;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingSplit;
import io.trino.util.FinalizerService;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Execution(value=ExecutionMode.CONCURRENT)
public class TestMultiSourcePartitionedScheduler {
    private static final PlanNodeId TABLE_SCAN_1_NODE_ID = new PlanNodeId("1");
    private static final PlanNodeId TABLE_SCAN_2_NODE_ID = new PlanNodeId("2");
    private static final QueryId QUERY_ID = new QueryId("query");
    private static final DynamicFilterId DYNAMIC_FILTER_ID = new DynamicFilterId("filter1");
    private final ExecutorService queryExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"stageExecutor-%s"));
    private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)"stageScheduledExecutor-%s"));
    private final InMemoryNodeManager nodeManager = new InMemoryNodeManager(new InternalNode[0]);
    private final FinalizerService finalizerService = new FinalizerService();
    private final Metadata metadata = MetadataManager.createTestMetadataManager();
    private final FunctionManager functionManager = FunctionManager.createTestingFunctionManager();
    private final TypeOperators typeOperators = new TypeOperators();
    private final Session session = TestingSession.testSessionBuilder().build();
    private final PlanNodeIdAllocator planNodeIdAllocator = new PlanNodeIdAllocator();

    public TestMultiSourcePartitionedScheduler() {
        this.nodeManager.addNodes(new InternalNode[]{new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)});
    }

    @BeforeAll
    public void setUp() {
        this.finalizerService.start();
    }

    @AfterAll
    public void destroyExecutor() {
        this.queryExecutor.shutdownNow();
        this.scheduledExecutor.shutdownNow();
        this.finalizerService.destroy();
    }

    @Test
    public void testScheduleSplitsBatchedNoBlocking() {
        PlanFragment plan = this.createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution stage = this.createStageExecution(plan, nodeTaskMap);
        StageScheduler scheduler = this.prepareScheduler((Map<PlanNodeId, ConnectorSplitSource>)ImmutableMap.of((Object)TABLE_SCAN_1_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(60), (Object)TABLE_SCAN_2_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(60)), this.createSplitPlacementPolicies(this.session, stage, nodeTaskMap, (InternalNodeManager)this.nodeManager), stage, 7);
        for (int i = 0; i <= 16; ++i) {
            ScheduleResult scheduleResult = scheduler.schedule();
            if (i == 16) {
                TestMultiSourcePartitionedScheduler.assertEffectivelyFinished(scheduleResult, scheduler);
            } else {
                Assertions.assertThat((boolean)scheduleResult.isFinished()).isFalse();
            }
            Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isTrue();
            Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(i == 0 ? 3 : 0);
        }
        for (RemoteTask remoteTask : stage.getAllTasks()) {
            PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo();
            Assertions.assertThat((int)splitsInfo.getCount()).isEqualTo(40);
        }
        stage.abort();
    }

    @Test
    public void testScheduleSplitsBatchedBlockingSplitSource() {
        PlanFragment plan = this.createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution stage = this.createStageExecution(plan, nodeTaskMap);
        QueuedSplitSource blockingSplitSource = new QueuedSplitSource(TestingSplit::createRemoteSplit);
        StageScheduler scheduler = this.prepareScheduler((Map<PlanNodeId, ConnectorSplitSource>)ImmutableMap.of((Object)TABLE_SCAN_1_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(10), (Object)TABLE_SCAN_2_NODE_ID, (Object)blockingSplitSource), this.createSplitPlacementPolicies(this.session, stage, nodeTaskMap, (InternalNodeManager)this.nodeManager), stage, 5);
        ScheduleResult scheduleResult = scheduler.schedule();
        Assertions.assertThat((boolean)scheduleResult.isFinished()).isFalse();
        Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isTrue();
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(3);
        scheduleResult = scheduler.schedule();
        Assertions.assertThat((boolean)scheduleResult.isFinished()).isFalse();
        Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isFalse();
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(0);
        Assertions.assertThat((Optional)scheduleResult.getBlockedReason()).isEqualTo(Optional.of(ScheduleResult.BlockedReason.WAITING_FOR_SOURCE));
        blockingSplitSource.addSplits(2, true);
        scheduleResult = scheduler.schedule();
        Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isTrue();
        Assertions.assertThat((int)scheduleResult.getSplitsScheduled()).isEqualTo(2);
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(0);
        Assertions.assertThat((Optional)scheduleResult.getBlockedReason()).isEqualTo(Optional.empty());
        Assertions.assertThat((boolean)scheduleResult.isFinished()).isTrue();
        TestMultiSourcePartitionedScheduler.assertPartitionedSplitCount(stage, 12);
        TestMultiSourcePartitionedScheduler.assertEffectivelyFinished(scheduleResult, scheduler);
        for (RemoteTask remoteTask : stage.getAllTasks()) {
            PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo();
            Assertions.assertThat((int)splitsInfo.getCount()).isEqualTo(4);
        }
        stage.abort();
    }

    @Test
    public void testScheduleSplitsTasksAreFull() {
        PlanFragment plan = this.createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution stage = this.createStageExecution(plan, nodeTaskMap);
        StageScheduler scheduler = this.prepareScheduler((Map<PlanNodeId, ConnectorSplitSource>)ImmutableMap.of((Object)TABLE_SCAN_1_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(200), (Object)TABLE_SCAN_2_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(200)), this.createSplitPlacementPolicies(this.session, stage, nodeTaskMap, (InternalNodeManager)this.nodeManager), stage, 200);
        ScheduleResult scheduleResult = scheduler.schedule();
        Assertions.assertThat((int)scheduleResult.getSplitsScheduled()).isEqualTo(300);
        Assertions.assertThat((boolean)scheduleResult.isFinished()).isFalse();
        Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isFalse();
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(3);
        Assertions.assertThat((Optional)scheduleResult.getBlockedReason()).isEqualTo(Optional.of(ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL));
        Assertions.assertThat((int)stage.getAllTasks().stream().mapToInt(task -> task.getPartitionedSplitsInfo().getCount()).sum()).isEqualTo(300);
        stage.abort();
    }

    @Test
    public void testBalancedSplitAssignment() {
        InMemoryNodeManager nodeManager = new InMemoryNodeManager(new InternalNode[]{new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)});
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        PlanFragment firstPlan = this.createFragment();
        StageExecution firstStage = this.createStageExecution(firstPlan, nodeTaskMap);
        QueuedSplitSource firstSplitSource = new QueuedSplitSource(TestingSplit::createRemoteSplit);
        QueuedSplitSource secondSplitSource = new QueuedSplitSource(TestingSplit::createRemoteSplit);
        StageScheduler scheduler = this.prepareScheduler((Map<PlanNodeId, ConnectorSplitSource>)ImmutableMap.of((Object)TABLE_SCAN_1_NODE_ID, (Object)firstSplitSource, (Object)TABLE_SCAN_2_NODE_ID, (Object)secondSplitSource), this.createSplitPlacementPolicies(this.session, firstStage, nodeTaskMap, (InternalNodeManager)nodeManager), firstStage, 15);
        firstSplitSource.addSplits(15, true);
        ScheduleResult scheduleResult = scheduler.schedule();
        Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isFalse();
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(3);
        Assertions.assertThat((int)firstStage.getAllTasks().size()).isEqualTo(3);
        for (RemoteTask remoteTask : firstStage.getAllTasks()) {
            PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo();
            Assertions.assertThat((int)splitsInfo.getCount()).isEqualTo(5);
        }
        InternalNode additionalNode = new InternalNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN, false);
        nodeManager.addNodes(new InternalNode[]{additionalNode});
        secondSplitSource.addSplits(3, true);
        scheduleResult = scheduler.schedule();
        TestMultiSourcePartitionedScheduler.assertEffectivelyFinished(scheduleResult, scheduler);
        Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isTrue();
        Assertions.assertThat((boolean)scheduleResult.isFinished()).isTrue();
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(1);
        Assertions.assertThat((int)firstStage.getAllTasks().size()).isEqualTo(4);
        Assertions.assertThat((int)((RemoteTask)firstStage.getAllTasks().get(0)).getPartitionedSplitsInfo().getCount()).isEqualTo(5);
        Assertions.assertThat((int)((RemoteTask)firstStage.getAllTasks().get(1)).getPartitionedSplitsInfo().getCount()).isEqualTo(5);
        Assertions.assertThat((int)((RemoteTask)firstStage.getAllTasks().get(2)).getPartitionedSplitsInfo().getCount()).isEqualTo(5);
        Assertions.assertThat((int)((RemoteTask)firstStage.getAllTasks().get(3)).getPartitionedSplitsInfo().getCount()).isEqualTo(3);
        PlanFragment secondPlan = this.createFragment();
        StageExecution secondStage = this.createStageExecution(secondPlan, nodeTaskMap);
        StageScheduler secondScheduler = this.prepareScheduler((Map<PlanNodeId, ConnectorSplitSource>)ImmutableMap.of((Object)TABLE_SCAN_1_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(10), (Object)TABLE_SCAN_2_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(10)), this.createSplitPlacementPolicies(this.session, secondStage, nodeTaskMap, (InternalNodeManager)nodeManager), secondStage, 10);
        scheduleResult = secondScheduler.schedule();
        TestMultiSourcePartitionedScheduler.assertEffectivelyFinished(scheduleResult, secondScheduler);
        Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isTrue();
        Assertions.assertThat((boolean)scheduleResult.isFinished()).isTrue();
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(4);
        Assertions.assertThat((int)secondStage.getAllTasks().size()).isEqualTo(4);
        for (RemoteTask task : secondStage.getAllTasks()) {
            Assertions.assertThat((int)task.getPartitionedSplitsInfo().getCount()).isEqualTo(5);
        }
        firstStage.abort();
        secondStage.abort();
    }

    @Test
    public void testScheduleEmptySources() {
        PlanFragment plan = this.createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution stage = this.createStageExecution(plan, nodeTaskMap);
        StageScheduler scheduler = this.prepareScheduler((Map<PlanNodeId, ConnectorSplitSource>)ImmutableMap.of((Object)TABLE_SCAN_1_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(0), (Object)TABLE_SCAN_2_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(0)), this.createSplitPlacementPolicies(this.session, stage, nodeTaskMap, (InternalNodeManager)this.nodeManager), stage, 15);
        ScheduleResult scheduleResult = scheduler.schedule();
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(2);
        TestMultiSourcePartitionedScheduler.assertEffectivelyFinished(scheduleResult, scheduler);
        stage.abort();
    }

    @Test
    public void testDynamicFiltersUnblockedOnBlockedBuildSource() {
        PlanFragment plan = this.createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution stage = this.createStageExecution(plan, nodeTaskMap);
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.functionManager, this.typeOperators, new DynamicFilterConfig());
        dynamicFilterService.registerQuery(QUERY_ID, SessionTestUtils.TEST_SESSION, (Set)ImmutableSet.of((Object)DYNAMIC_FILTER_ID), (Set)ImmutableSet.of((Object)DYNAMIC_FILTER_ID), (Set)ImmutableSet.of((Object)DYNAMIC_FILTER_ID));
        StageScheduler scheduler = this.prepareScheduler((Map<PlanNodeId, ConnectorSplitSource>)ImmutableMap.of((Object)TABLE_SCAN_1_NODE_ID, (Object)new QueuedSplitSource(), (Object)TABLE_SCAN_2_NODE_ID, (Object)new QueuedSplitSource()), this.createSplitPlacementPolicies(this.session, stage, nodeTaskMap, (InternalNodeManager)this.nodeManager), stage, dynamicFilterService, () -> true, 15);
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol symbol = symbolAllocator.newSymbol("DF_SYMBOL1", (Type)BigintType.BIGINT);
        DynamicFilter dynamicFilter = dynamicFilterService.createDynamicFilter(QUERY_ID, (List)ImmutableList.of((Object)new DynamicFilters.Descriptor(DYNAMIC_FILTER_ID, (Expression)symbol.toSymbolReference())), (Map)ImmutableMap.of((Object)symbol, (Object)new TestingMetadata.TestingColumnHandle("probeColumnA")), symbolAllocator.getTypes());
        Assertions.assertThat((Comparable)stage.getState()).isEqualTo((Object)StageExecution.State.PLANNED);
        scheduler.start();
        Assertions.assertThat((int)stage.getAllTasks().size()).isEqualTo(1);
        Assertions.assertThat((Comparable)stage.getState()).isEqualTo((Object)StageExecution.State.SCHEDULING);
        Assertions.assertThat((boolean)dynamicFilter.isBlocked().isDone()).isFalse();
        ScheduleResult scheduleResult = scheduler.schedule();
        Assertions.assertThat((boolean)dynamicFilter.isBlocked().isDone()).isTrue();
        Assertions.assertThat((int)scheduleResult.getSplitsScheduled()).isEqualTo(0);
    }

    @Test
    public void testNoNewTaskScheduledWhenChildStageBufferIsOverUtilized() {
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        InMemoryNodeManager nodeManager = new InMemoryNodeManager(new InternalNode[]{new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)});
        PlanFragment plan = this.createFragment();
        StageExecution stage = this.createStageExecution(plan, nodeTaskMap);
        StageScheduler scheduler = this.prepareScheduler((Map<PlanNodeId, ConnectorSplitSource>)ImmutableMap.of((Object)TABLE_SCAN_1_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(200), (Object)TABLE_SCAN_2_NODE_ID, (Object)TestMultiSourcePartitionedScheduler.createFixedSplitSource(200)), this.createSplitPlacementPolicies(this.session, stage, nodeTaskMap, (InternalNodeManager)nodeManager), stage, new DynamicFilterService(this.metadata, this.functionManager, this.typeOperators, new DynamicFilterConfig()), () -> true, 200);
        ScheduleResult scheduleResult = scheduler.schedule();
        Assertions.assertThat((Optional)scheduleResult.getBlockedReason()).isEqualTo(Optional.of(ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL));
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(3);
        Assertions.assertThat((int)scheduleResult.getSplitsScheduled()).isEqualTo(300);
        for (RemoteTask remoteTask : scheduleResult.getNewTasks()) {
            PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo();
            Assertions.assertThat((int)splitsInfo.getCount()).isEqualTo(100);
        }
        nodeManager.addNodes(new InternalNode[]{new InternalNode("other4", URI.create("http://127.0.0.4:14"), NodeVersion.UNKNOWN, false)});
        scheduleResult = scheduler.schedule();
        Assertions.assertThat((Optional)scheduleResult.getBlockedReason()).isEqualTo(Optional.of(ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL));
        Assertions.assertThat((int)scheduleResult.getNewTasks().size()).isEqualTo(0);
        Assertions.assertThat((int)scheduleResult.getSplitsScheduled()).isEqualTo(0);
    }

    private static void assertPartitionedSplitCount(StageExecution stage, int expectedPartitionedSplitCount) {
        Assertions.assertThat((int)stage.getAllTasks().stream().mapToInt(remoteTask -> remoteTask.getPartitionedSplitsInfo().getCount()).sum()).isEqualTo(expectedPartitionedSplitCount);
    }

    private static void assertEffectivelyFinished(ScheduleResult scheduleResult, StageScheduler scheduler) {
        if (scheduleResult.isFinished()) {
            Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isTrue();
            return;
        }
        Assertions.assertThat((boolean)scheduleResult.getBlocked().isDone()).isTrue();
        ScheduleResult nextScheduleResult = scheduler.schedule();
        Assertions.assertThat((boolean)nextScheduleResult.isFinished()).isTrue();
        Assertions.assertThat((boolean)nextScheduleResult.getBlocked().isDone()).isTrue();
        Assertions.assertThat((int)nextScheduleResult.getNewTasks().size()).isEqualTo(0);
        Assertions.assertThat((int)nextScheduleResult.getSplitsScheduled()).isEqualTo(0);
    }

    private StageScheduler prepareScheduler(Map<PlanNodeId, ConnectorSplitSource> splitSources, SplitPlacementPolicy splitPlacementPolicy, StageExecution stage, int splitBatchSize) {
        return this.prepareScheduler(splitSources, splitPlacementPolicy, stage, new DynamicFilterService(this.metadata, this.functionManager, this.typeOperators, new DynamicFilterConfig()), () -> false, splitBatchSize);
    }

    private StageScheduler prepareScheduler(Map<PlanNodeId, ConnectorSplitSource> splitSources, SplitPlacementPolicy splitPlacementPolicy, StageExecution stage, DynamicFilterService dynamicFilterService, BooleanSupplier anySourceTaskBlocked, int splitBatchSize) {
        Map<PlanNodeId, SplitSource> sources = splitSources.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ConnectorAwareSplitSource(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplitSource)e.getValue())));
        return new MultiSourcePartitionedScheduler(stage, sources, splitPlacementPolicy, splitBatchSize, dynamicFilterService, new TableExecuteContextManager(), anySourceTaskBlocked);
    }

    private PlanFragment createFragment() {
        return this.createFragment(TestingHandles.TEST_TABLE_HANDLE, TestingHandles.TEST_TABLE_HANDLE);
    }

    private PlanFragment createFragment(TableHandle firstTableHandle, TableHandle secondTableHandle) {
        Symbol symbol = new Symbol("column");
        Symbol buildSymbol = new Symbol("buildColumn");
        TableScanNode tableScanOne = TableScanNode.newInstance((PlanNodeId)TABLE_SCAN_1_NODE_ID, (TableHandle)firstTableHandle, (List)ImmutableList.of((Object)symbol), (Map)ImmutableMap.of((Object)symbol, (Object)new TestingMetadata.TestingColumnHandle("column")), (boolean)false, Optional.empty());
        FilterNode filterNodeOne = new FilterNode(new PlanNodeId("filter_node_id"), (PlanNode)tableScanOne, DynamicFilters.createDynamicFilterExpression((Metadata)MetadataManager.createTestMetadataManager(), (DynamicFilterId)DYNAMIC_FILTER_ID, (Type)VarcharType.VARCHAR, (Expression)symbol.toSymbolReference()));
        TableScanNode tableScanTwo = TableScanNode.newInstance((PlanNodeId)TABLE_SCAN_2_NODE_ID, (TableHandle)secondTableHandle, (List)ImmutableList.of((Object)symbol), (Map)ImmutableMap.of((Object)symbol, (Object)new TestingMetadata.TestingColumnHandle("column")), (boolean)false, Optional.empty());
        FilterNode filterNodeTwo = new FilterNode(new PlanNodeId("filter_node_id"), (PlanNode)tableScanTwo, DynamicFilters.createDynamicFilterExpression((Metadata)MetadataManager.createTestMetadataManager(), (DynamicFilterId)DYNAMIC_FILTER_ID, (Type)VarcharType.VARCHAR, (Expression)symbol.toSymbolReference()));
        RemoteSourceNode remote = new RemoteSourceNode(new PlanNodeId("remote_id"), new PlanFragmentId("plan_fragment_id"), (List)ImmutableList.of((Object)buildSymbol), Optional.empty(), ExchangeNode.Type.REPLICATE, RetryPolicy.NONE);
        return new PlanFragment(new PlanFragmentId("plan_id"), (PlanNode)new JoinNode(new PlanNodeId("join_id"), JoinType.INNER, (PlanNode)new ExchangeNode(this.planNodeIdAllocator.getNextId(), ExchangeNode.Type.REPARTITION, ExchangeNode.Scope.LOCAL, new PartitioningScheme(Partitioning.create((PartitioningHandle)SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION, (List)ImmutableList.of()), tableScanOne.getOutputSymbols()), (List)ImmutableList.of((Object)filterNodeOne, (Object)filterNodeTwo), (List)ImmutableList.of((Object)tableScanOne.getOutputSymbols(), (Object)tableScanTwo.getOutputSymbols()), Optional.empty()), (PlanNode)remote, (List)ImmutableList.of(), tableScanOne.getOutputSymbols(), remote.getOutputSymbols(), false, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), (Map)ImmutableMap.of((Object)DYNAMIC_FILTER_ID, (Object)buildSymbol), Optional.empty()), (Map)ImmutableMap.of((Object)symbol, (Object)VarcharType.VARCHAR), SystemPartitioningHandle.SOURCE_DISTRIBUTION, Optional.empty(), (List)ImmutableList.of((Object)TABLE_SCAN_1_NODE_ID, (Object)TABLE_SCAN_2_NODE_ID), new PartitioningScheme(Partitioning.create((PartitioningHandle)SystemPartitioningHandle.SINGLE_DISTRIBUTION, (List)ImmutableList.of()), (List)ImmutableList.of((Object)symbol)), StatsAndCosts.empty(), (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty());
    }

    private static ConnectorSplitSource createFixedSplitSource(int splitCount) {
        return new FixedSplitSource(IntStream.range(0, splitCount).mapToObj(ix -> new TestingSplit(true, (List)ImmutableList.of())).toList());
    }

    private SplitPlacementPolicy createSplitPlacementPolicies(Session session, StageExecution stage, NodeTaskMap nodeTaskMap, InternalNodeManager nodeManager) {
        return this.createSplitPlacementPolicies(session, stage, nodeTaskMap, nodeManager, TestingHandles.TEST_CATALOG_HANDLE);
    }

    private SplitPlacementPolicy createSplitPlacementPolicies(Session session, StageExecution stage, NodeTaskMap nodeTaskMap, InternalNodeManager nodeManager, CatalogHandle catalog) {
        NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig().setIncludeCoordinator(false).setMaxSplitsPerNode(100).setMinPendingSplitsPerTask(0).setSplitsBalancingPolicy(NodeSchedulerConfig.SplitsBalancingPolicy.STAGE);
        NodeScheduler nodeScheduler = new NodeScheduler((NodeSelectorFactory)new UniformNodeSelectorFactory(nodeManager, nodeSchedulerConfig, nodeTaskMap, new Duration(0.0, TimeUnit.SECONDS)));
        return new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session, Optional.of(catalog)), () -> ((StageExecution)stage).getAllTasks());
    }

    private StageExecution createStageExecution(PlanFragment fragment, NodeTaskMap nodeTaskMap) {
        StageId stageId = new StageId(QUERY_ID, 0);
        SqlStage stage = SqlStage.createSqlStage((StageId)stageId, (PlanFragment)fragment, (Map)ImmutableMap.of((Object)TABLE_SCAN_1_NODE_ID, (Object)new TableInfo(Optional.of("test"), new QualifiedObjectName("test", "test", "test"), TupleDomain.all()), (Object)TABLE_SCAN_2_NODE_ID, (Object)new TableInfo(Optional.of("test"), new QualifiedObjectName("test", "test", "test"), TupleDomain.all())), (RemoteTaskFactory)new MockRemoteTaskFactory(this.queryExecutor, this.scheduledExecutor), (Session)SessionTestUtils.TEST_SESSION, (boolean)true, (NodeTaskMap)nodeTaskMap, (Executor)this.queryExecutor, (Tracer)Tracing.noopTracer(), (Span)Span.getInvalid(), (SplitSchedulerStats)new SplitSchedulerStats());
        ImmutableMap.Builder outputBuffers = ImmutableMap.builder();
        outputBuffers.put((Object)fragment.getId(), (Object)new PartitionedPipelinedOutputBufferManager(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 1));
        fragment.getRemoteSourceNodes().stream().flatMap(node -> node.getSourceFragmentIds().stream()).forEach(fragmentId -> outputBuffers.put(fragmentId, (Object)new PartitionedPipelinedOutputBufferManager(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 10)));
        return PipelinedStageExecution.createPipelinedStageExecution((SqlStage)stage, (Map)outputBuffers.buildOrThrow(), (TaskLifecycleListener)TaskLifecycleListener.NO_OP, (FailureDetector)new NoOpFailureDetector(), (Executor)this.queryExecutor, Optional.of(new int[]{0}), (int)0);
    }

    private static class QueuedSplitSource
    implements ConnectorSplitSource {
        private final Supplier<ConnectorSplit> splitFactory;
        private final LinkedBlockingQueue<ConnectorSplit> queue = new LinkedBlockingQueue();
        private CompletableFuture<?> notEmptyFuture = new CompletableFuture();
        private boolean closed;

        public QueuedSplitSource(Supplier<ConnectorSplit> splitFactory) {
            this.splitFactory = Objects.requireNonNull(splitFactory, "splitFactory is null");
        }

        public QueuedSplitSource() {
            this.splitFactory = TestingSplit::createRemoteSplit;
        }

        synchronized void addSplits(int count, boolean lastSplits) {
            if (this.closed) {
                return;
            }
            for (int i = 0; i < count; ++i) {
                this.queue.add(this.splitFactory.get());
            }
            if (lastSplits) {
                this.close();
            }
            this.notEmptyFuture.complete(null);
        }

        public CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> getNextBatch(int maxSize) {
            return ((CompletableFuture)this.notEmptyFuture.thenApply(x -> this.getBatch(maxSize))).thenApply(splits -> new ConnectorSplitSource.ConnectorSplitBatch(splits, this.isFinished()));
        }

        private synchronized List<ConnectorSplit> getBatch(int maxSize) {
            ArrayList elements = new ArrayList(maxSize);
            this.queue.drainTo(elements, maxSize);
            if (this.queue.isEmpty() && !this.closed && this.notEmptyFuture.isDone()) {
                this.notEmptyFuture = new CompletableFuture();
            }
            return ImmutableList.copyOf(elements);
        }

        public synchronized boolean isFinished() {
            return this.closed && this.queue.isEmpty();
        }

        public synchronized void close() {
            this.closed = true;
        }
    }

    private static class InMemoryNodeManagerByCatalog
    extends InMemoryNodeManager {
        private final Function<CatalogHandle, Set<InternalNode>> nodesByCatalogs;

        public InMemoryNodeManagerByCatalog(Set<InternalNode> nodes, Function<CatalogHandle, Set<InternalNode>> nodesByCatalogs) {
            super(nodes);
            this.nodesByCatalogs = nodesByCatalogs;
        }

        public Set<InternalNode> getActiveCatalogNodes(CatalogHandle catalogHandle) {
            return this.nodesByCatalogs.apply(catalogHandle);
        }
    }
}

