/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution;

import com.google.common.base.MoreObjects;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.airlift.concurrent.Threads;
import io.airlift.slice.SizeOf;
import io.trino.Session;
import io.trino.client.NodeVersion;
import io.trino.execution.MockRemoteTaskFactory;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.PartitionedSplitsInfo;
import io.trino.execution.RemoteTask;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.scheduler.NetworkLocation;
import io.trino.execution.scheduler.NetworkTopology;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.NodeSelector;
import io.trino.execution.scheduler.NodeSelectorFactory;
import io.trino.execution.scheduler.SplitPlacementResult;
import io.trino.execution.scheduler.TopologyAwareNodeSelectorConfig;
import io.trino.execution.scheduler.TopologyAwareNodeSelectorFactory;
import io.trino.execution.scheduler.UniformNodeSelectorFactory;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Split;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingSession;
import io.trino.testing.assertions.TrinoExceptionAssert;
import io.trino.util.FinalizerService;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_METHOD)
@Execution(value=ExecutionMode.SAME_THREAD)
public class TestNodeScheduler {
    private FinalizerService finalizerService;
    private NodeTaskMap nodeTaskMap;
    private InMemoryNodeManager nodeManager;
    private NodeSchedulerConfig nodeSchedulerConfig;
    private NodeScheduler nodeScheduler;
    private NodeSelector nodeSelector;
    private Map<InternalNode, RemoteTask> taskMap;
    private ExecutorService remoteTaskExecutor;
    private ScheduledExecutorService remoteTaskScheduledExecutor;
    private Session session;

    @BeforeEach
    public void setUp() {
        this.session = TestingSession.testSessionBuilder().build();
        this.finalizerService = new FinalizerService();
        this.nodeTaskMap = new NodeTaskMap(this.finalizerService);
        this.nodeManager = new InMemoryNodeManager(new InternalNode[0]);
        this.nodeSchedulerConfig = new NodeSchedulerConfig().setMaxSplitsPerNode(20).setMinPendingSplitsPerTask(10).setMaxAdjustedPendingSplitsWeightPerTask(100).setIncludeCoordinator(false);
        this.nodeScheduler = new NodeScheduler((NodeSelectorFactory)new UniformNodeSelectorFactory((InternalNodeManager)this.nodeManager, this.nodeSchedulerConfig, this.nodeTaskMap));
        this.taskMap = new HashMap<InternalNode, RemoteTask>();
        this.nodeSelector = this.nodeScheduler.createNodeSelector(this.session, Optional.of(TestingHandles.TEST_CATALOG_HANDLE));
        this.remoteTaskExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"remoteTaskExecutor-%s"));
        this.remoteTaskScheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)"remoteTaskScheduledExecutor-%s"));
        this.finalizerService.start();
    }

    private void setUpNodes() {
        this.nodeManager.addNodes(new InternalNode[]{new InternalNode("other1", URI.create("http://10.0.0.1:11"), NodeVersion.UNKNOWN, false), new InternalNode("other2", URI.create("http://10.0.0.1:12"), NodeVersion.UNKNOWN, false), new InternalNode("other3", URI.create("http://10.0.0.1:13"), NodeVersion.UNKNOWN, false)});
    }

    @AfterEach
    public void tearDown() {
        this.remoteTaskExecutor.shutdown();
        this.remoteTaskExecutor = null;
        this.remoteTaskScheduledExecutor.shutdown();
        this.remoteTaskScheduledExecutor = null;
        this.nodeSchedulerConfig = null;
        this.nodeScheduler = null;
        this.nodeSelector = null;
        this.finalizerService.destroy();
        this.finalizerService = null;
    }

    @Test
    public void testAssignmentWhenNoNodes() {
        TrinoExceptionAssert.assertTrinoExceptionThrownBy(() -> this.computeSingleAssignment(this.nodeSelector, new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()))).hasErrorCode(new ErrorCodeSupplier[]{StandardErrorCode.NO_NODES_AVAILABLE}).hasMessageMatching("No nodes available to run query");
    }

    @Test
    public void testScheduleLocal() {
        this.setUpNodes();
        Split split = new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitLocallyAccessible());
        Map.Entry assignment = (Map.Entry)Iterables.getOnlyElement((Iterable)this.computeSingleAssignment(this.nodeSelector, split).entries());
        Assertions.assertThat((Object)((InternalNode)assignment.getKey()).getHostAndPort()).isEqualTo(split.getAddresses().get(0));
        Assertions.assertThat((Object)((Split)assignment.getValue())).isEqualTo((Object)split);
    }

    @Test
    @Timeout(value=60L)
    public void testTopologyAwareScheduling() {
        int i;
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        InMemoryNodeManager nodeManager = new InMemoryNodeManager(new InternalNode[]{new InternalNode("node1", URI.create("http://host1.rack1:11"), NodeVersion.UNKNOWN, false), new InternalNode("node2", URI.create("http://host2.rack1:12"), NodeVersion.UNKNOWN, false), new InternalNode("node3", URI.create("http://host3.rack2:13"), NodeVersion.UNKNOWN, false)});
        HashMap<InternalNode, MockRemoteTaskFactory.MockRemoteTask> taskMap = new HashMap<InternalNode, MockRemoteTaskFactory.MockRemoteTask>();
        NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig().setMaxSplitsPerNode(25).setIncludeCoordinator(false).setMinPendingSplitsPerTask(20);
        TestNetworkTopology topology = new TestNetworkTopology();
        TopologyAwareNodeSelectorFactory nodeSelectorFactory = new TopologyAwareNodeSelectorFactory((NetworkTopology)topology, (InternalNodeManager)nodeManager, nodeSchedulerConfig, nodeTaskMap, TestNodeScheduler.getNetworkTopologyConfig());
        NodeScheduler nodeScheduler = new NodeScheduler((NodeSelectorFactory)nodeSelectorFactory);
        NodeSelector nodeSelector = nodeScheduler.createNodeSelector(this.session, Optional.of(TestingHandles.TEST_CATALOG_HANDLE));
        ImmutableSet.Builder nonRackLocalBuilder = ImmutableSet.builder();
        for (int i2 = 0; i2 < 108; ++i2) {
            nonRackLocalBuilder.add((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote(HostAddress.fromParts((String)"data.other_rack", (int)1))));
        }
        ImmutableSet nonRackLocalSplits = nonRackLocalBuilder.build();
        Multimap assignments = nodeSelector.computeAssignments((Set)nonRackLocalSplits, (List)ImmutableList.copyOf(taskMap.values())).getAssignments();
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor, this.remoteTaskScheduledExecutor);
        int task = 0;
        for (InternalNode node : assignments.keySet()) {
            TaskId taskId = new TaskId(new StageId("test", 1), task, 0);
            ++task;
            MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, (List<Split>)ImmutableList.copyOf((Collection)assignments.get((Object)node)), nodeTaskMap.createPartitionedSplitCountTracker(node, taskId));
            remoteTask.startSplits(25);
            nodeTaskMap.addTask(node, (RemoteTask)remoteTask);
            taskMap.put(node, remoteTask);
        }
        nonRackLocalSplits = Sets.difference((Set)nonRackLocalSplits, new HashSet(assignments.values()));
        assignments = nodeSelector.computeAssignments((Set)nonRackLocalSplits, (List)ImmutableList.copyOf(taskMap.values())).getAssignments();
        for (InternalNode node : assignments.keySet()) {
            RemoteTask remoteTask = (RemoteTask)taskMap.get(node);
            remoteTask.addSplits((Multimap)ImmutableMultimap.builder().putAll((Object)new PlanNodeId("sourceId"), (Iterable)assignments.get((Object)node)).build());
        }
        nonRackLocalSplits = Sets.difference((Set)nonRackLocalSplits, new HashSet(assignments.values()));
        Assertions.assertThat((int)nonRackLocalSplits.size()).isEqualTo(3);
        ImmutableSet.Builder rackLocalSplits = ImmutableSet.builder();
        HostAddress dataHost1 = HostAddress.fromParts((String)"data.rack1", (int)1);
        HostAddress dataHost2 = HostAddress.fromParts((String)"data.rack2", (int)1);
        for (i = 0; i < 12; ++i) {
            rackLocalSplits.add((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote(dataHost1)));
        }
        for (i = 0; i < 6; ++i) {
            rackLocalSplits.add((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote(dataHost2)));
        }
        assignments = nodeSelector.computeAssignments((Set)rackLocalSplits.build(), (List)ImmutableList.copyOf(taskMap.values())).getAssignments();
        for (Object node : assignments.keySet()) {
            RemoteTask remoteTask = (RemoteTask)taskMap.get(node);
            remoteTask.addSplits((Multimap)ImmutableMultimap.builder().putAll((Object)new PlanNodeId("sourceId"), (Iterable)assignments.get(node)).build());
        }
        Sets.SetView unassigned = Sets.difference((Set)rackLocalSplits.build(), new HashSet(assignments.values()));
        assignments = nodeSelector.computeAssignments((Set)unassigned, (List)ImmutableList.copyOf(taskMap.values())).getAssignments();
        for (InternalNode node : assignments.keySet()) {
            RemoteTask remoteTask = (RemoteTask)taskMap.get(node);
            remoteTask.addSplits((Multimap)ImmutableMultimap.builder().putAll((Object)new PlanNodeId("sourceId"), (Iterable)assignments.get((Object)node)).build());
        }
        unassigned = Sets.difference((Set)unassigned, new HashSet(assignments.values()));
        Assertions.assertThat((int)unassigned.size()).isEqualTo(3);
        int rack1 = 0;
        int rack2 = 0;
        block15: for (Split split : unassigned) {
            String rack;
            switch (rack = (String)topology.locate((HostAddress)split.getAddresses().get(0)).getSegments().get(0)) {
                case "rack1": {
                    ++rack1;
                    continue block15;
                }
                case "rack2": {
                    ++rack2;
                    continue block15;
                }
            }
            throw new AssertionError((Object)("Unexpected rack: " + rack));
        }
        Assertions.assertThat((int)rack1).isEqualTo(2);
        Assertions.assertThat((int)rack2).isEqualTo(1);
        ImmutableSet.Builder localSplits = ImmutableSet.builder();
        localSplits.add((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote(HostAddress.fromParts((String)"host1.rack1", (int)1))));
        localSplits.add((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote(HostAddress.fromParts((String)"host2.rack1", (int)1))));
        localSplits.add((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote(HostAddress.fromParts((String)"host3.rack2", (int)1))));
        assignments = nodeSelector.computeAssignments((Set)localSplits.build(), (List)ImmutableList.copyOf(taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments.size()).isEqualTo(3);
        Assertions.assertThat((int)assignments.keySet().size()).isEqualTo(3);
    }

    @Test
    public void testScheduleRemote() {
        this.setUpNodes();
        Multimap<InternalNode, Split> assignments = this.computeSingleAssignment(this.nodeSelector, new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()));
        Assertions.assertThat((int)assignments.size()).isEqualTo(1);
    }

    @Test
    public void testBasicAssignment() {
        this.setUpNodes();
        Set activeCatalogNodes = (Set)this.nodeManager.getActiveCatalogNodes(TestingHandles.TEST_CATALOG_HANDLE).stream().filter(node -> !node.isCoordinator()).collect(ImmutableSet.toImmutableSet());
        HashSet<Split> splits = new HashSet<Split>();
        for (int i = 0; i < activeCatalogNodes.size(); ++i) {
            splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()));
        }
        Multimap assignments = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments.entries().size()).isEqualTo(assignments.size());
        for (InternalNode node2 : activeCatalogNodes) {
            Assertions.assertThat((Collection)assignments.keySet()).contains((Object[])new InternalNode[]{node2});
        }
    }

    @Test
    public void testMaxSplitsPerNode() {
        this.setUpNodes();
        InternalNode newNode = new InternalNode("other4", URI.create("http://10.0.0.1:14"), NodeVersion.UNKNOWN, false);
        this.nodeManager.addNodes(new InternalNode[]{newNode});
        ImmutableList.Builder initialSplits = ImmutableList.builder();
        for (int i = 0; i < 10; ++i) {
            initialSplits.add((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()));
        }
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor, this.remoteTaskScheduledExecutor);
        TaskId taskId1 = new TaskId(new StageId("test", 1), 1, 0);
        MockRemoteTaskFactory.MockRemoteTask remoteTask1 = remoteTaskFactory.createTableScanTask(taskId1, newNode, (List<Split>)initialSplits.build(), this.nodeTaskMap.createPartitionedSplitCountTracker(newNode, taskId1));
        this.nodeTaskMap.addTask(newNode, (RemoteTask)remoteTask1);
        TaskId taskId2 = new TaskId(new StageId("test", 1), 2, 0);
        MockRemoteTaskFactory.MockRemoteTask remoteTask2 = remoteTaskFactory.createTableScanTask(taskId2, newNode, (List<Split>)initialSplits.build(), this.nodeTaskMap.createPartitionedSplitCountTracker(newNode, taskId2));
        this.nodeTaskMap.addTask(newNode, (RemoteTask)remoteTask2);
        HashSet<Split> splits = new HashSet<Split>();
        for (int i = 0; i < 5; ++i) {
            splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()));
        }
        Multimap assignments = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((boolean)assignments.keySet().contains(newNode)).isFalse();
        remoteTask1.abort();
        remoteTask2.abort();
        Assertions.assertThat((Object)this.nodeTaskMap.getPartitionedSplitsOnNode(newNode)).isEqualTo((Object)PartitionedSplitsInfo.forZeroSplits());
    }

    @Test
    public void testBasicAssignmentMaxUnacknowledgedSplitsPerTask() {
        this.nodeSelector = this.nodeScheduler.createNodeSelector(TestNodeScheduler.sessionWithMaxUnacknowledgedSplitsPerTask(1), Optional.of(TestingHandles.TEST_CATALOG_HANDLE));
        this.setUpNodes();
        Set activeCatalogNodes = (Set)this.nodeManager.getActiveCatalogNodes(TestingHandles.TEST_CATALOG_HANDLE).stream().filter(node -> !node.isCoordinator()).collect(ImmutableSet.toImmutableSet());
        int splitCount = activeCatalogNodes.size() + 1;
        HashSet<Split> splits = new HashSet<Split>();
        for (int i = 0; i < splitCount; ++i) {
            splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()));
        }
        Multimap assignments = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments.entries().size()).isEqualTo(activeCatalogNodes.size());
        for (InternalNode node2 : activeCatalogNodes) {
            Assertions.assertThat((Collection)assignments.keySet()).contains((Object[])new InternalNode[]{node2});
        }
    }

    @Test
    public void testMaxSplitsPerNodePerTask() {
        this.setUpNodes();
        InternalNode newNode = new InternalNode("other4", URI.create("http://10.0.0.1:14"), NodeVersion.UNKNOWN, false);
        this.nodeManager.addNodes(new InternalNode[]{newNode});
        ImmutableList.Builder initialSplits = ImmutableList.builder();
        for (int i = 0; i < 20; ++i) {
            initialSplits.add((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()));
        }
        ArrayList<MockRemoteTaskFactory.MockRemoteTask> tasks = new ArrayList<MockRemoteTaskFactory.MockRemoteTask>();
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor, this.remoteTaskScheduledExecutor);
        for (InternalNode node : this.nodeManager.getActiveCatalogNodes(TestingHandles.TEST_CATALOG_HANDLE)) {
            TaskId taskId = new TaskId(new StageId("test", 1), 1, 0);
            MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, (List<Split>)initialSplits.build(), this.nodeTaskMap.createPartitionedSplitCountTracker(node, taskId));
            this.nodeTaskMap.addTask(node, (RemoteTask)remoteTask);
            tasks.add(remoteTask);
        }
        TaskId taskId = new TaskId(new StageId("test", 1), 2, 0);
        MockRemoteTaskFactory.MockRemoteTask newRemoteTask = remoteTaskFactory.createTableScanTask(taskId, newNode, (List<Split>)initialSplits.build(), this.nodeTaskMap.createPartitionedSplitCountTracker(newNode, taskId));
        this.taskMap.put(newNode, newRemoteTask);
        this.nodeTaskMap.addTask(newNode, (RemoteTask)newRemoteTask);
        tasks.add(newRemoteTask);
        HashSet<Split> splits = new HashSet<Split>();
        for (int i = 0; i < 5; ++i) {
            splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()));
        }
        Multimap assignments = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments.keySet().size()).isEqualTo(3);
        Assertions.assertThat((boolean)assignments.keySet().contains(newNode)).isFalse();
        for (RemoteTask remoteTask : tasks) {
            remoteTask.abort();
        }
        Assertions.assertThat((Object)this.nodeTaskMap.getPartitionedSplitsOnNode(newNode)).isEqualTo((Object)PartitionedSplitsInfo.forZeroSplits());
    }

    @Test
    public void testTaskCompletion() throws Exception {
        this.setUpNodes();
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor, this.remoteTaskScheduledExecutor);
        InternalNode chosenNode = (InternalNode)Iterables.get((Iterable)this.nodeManager.getActiveCatalogNodes(TestingHandles.TEST_CATALOG_HANDLE), (int)0);
        TaskId taskId = new TaskId(new StageId("test", 1), 1, 0);
        MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, chosenNode, (List<Split>)ImmutableList.of((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote())), this.nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId));
        this.nodeTaskMap.addTask(chosenNode, (RemoteTask)remoteTask);
        Assertions.assertThat((int)this.nodeTaskMap.getPartitionedSplitsOnNode(chosenNode).getCount()).isEqualTo(1);
        remoteTask.abort();
        TimeUnit.MILLISECONDS.sleep(100L);
        Assertions.assertThat((Object)this.nodeTaskMap.getPartitionedSplitsOnNode(chosenNode)).isEqualTo((Object)PartitionedSplitsInfo.forZeroSplits());
        remoteTask.abort();
        Assertions.assertThat((Object)this.nodeTaskMap.getPartitionedSplitsOnNode(chosenNode)).isEqualTo((Object)PartitionedSplitsInfo.forZeroSplits());
    }

    @Test
    public void testSplitCount() {
        this.setUpNodes();
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor, this.remoteTaskScheduledExecutor);
        InternalNode chosenNode = (InternalNode)Iterables.get((Iterable)this.nodeManager.getActiveCatalogNodes(TestingHandles.TEST_CATALOG_HANDLE), (int)0);
        TaskId taskId1 = new TaskId(new StageId("test", 1), 1, 0);
        MockRemoteTaskFactory.MockRemoteTask remoteTask1 = remoteTaskFactory.createTableScanTask(taskId1, chosenNode, (List<Split>)ImmutableList.of((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()), (Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote())), this.nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId1));
        TaskId taskId2 = new TaskId(new StageId("test", 1), 2, 0);
        MockRemoteTaskFactory.MockRemoteTask remoteTask2 = remoteTaskFactory.createTableScanTask(taskId2, chosenNode, (List<Split>)ImmutableList.of((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote())), this.nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId2));
        this.nodeTaskMap.addTask(chosenNode, (RemoteTask)remoteTask1);
        this.nodeTaskMap.addTask(chosenNode, (RemoteTask)remoteTask2);
        Assertions.assertThat((int)this.nodeTaskMap.getPartitionedSplitsOnNode(chosenNode).getCount()).isEqualTo(3);
        remoteTask1.abort();
        Assertions.assertThat((int)this.nodeTaskMap.getPartitionedSplitsOnNode(chosenNode).getCount()).isEqualTo(1);
        remoteTask2.abort();
        Assertions.assertThat((Object)this.nodeTaskMap.getPartitionedSplitsOnNode(chosenNode)).isEqualTo((Object)PartitionedSplitsInfo.forZeroSplits());
    }

    @Test
    public void testOptimizedLocalScheduling() {
        InternalNode node1 = new InternalNode("node1", URI.create("http://10.0.0.1:11"), NodeVersion.UNKNOWN, false);
        this.nodeManager.addNodes(new InternalNode[]{node1});
        InternalNode node2 = new InternalNode("node2", URI.create("http://10.0.0.1:12"), NodeVersion.UNKNOWN, false);
        this.nodeManager.addNodes(new InternalNode[]{node2});
        LinkedHashSet<Split> splits = new LinkedHashSet<Split>();
        for (int i = 0; i < 20; ++i) {
            splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitLocal()));
        }
        Multimap assignments1 = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments1.size()).isEqualTo(20);
        Assertions.assertThat((int)assignments1.keySet().size()).isEqualTo(1);
        Assertions.assertThat((Collection)assignments1.keySet()).contains((Object[])new InternalNode[]{node1});
        for (int i = 0; i < 19; ++i) {
            splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote(HostAddress.fromString((String)"10.0.0.1:12"))));
        }
        Multimap assignments2 = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments2.size()).isEqualTo(39);
        Assertions.assertThat((int)assignments2.keySet().size()).isEqualTo(2);
        Assertions.assertThat((Collection)assignments2.keySet()).contains((Object[])new InternalNode[]{node1});
        Assertions.assertThat((Collection)assignments2.keySet()).contains((Object[])new InternalNode[]{node2});
        long node1Splits = assignments2.values().stream().map(Split::getConnectorSplit).filter(TestSplitLocal.class::isInstance).count();
        Assertions.assertThat((long)node1Splits).isEqualTo(20L);
        long node2Splits = assignments2.values().stream().map(Split::getConnectorSplit).filter(TestSplitRemote.class::isInstance).count();
        Assertions.assertThat((long)node2Splits).isEqualTo(19L);
        splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitLocal()));
        splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote(HostAddress.fromString((String)"10.0.0.1:12"))));
        Multimap assignments3 = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments3.size()).isEqualTo(40);
        Assertions.assertThat((int)assignments3.keySet().size()).isEqualTo(2);
        Assertions.assertThat((Collection)assignments3.keySet()).contains((Object[])new InternalNode[]{node1});
        Assertions.assertThat((Collection)assignments3.keySet()).contains((Object[])new InternalNode[]{node2});
        node1Splits = assignments3.values().stream().map(Split::getConnectorSplit).filter(TestSplitLocal.class::isInstance).count();
        Assertions.assertThat((long)node1Splits).isEqualTo(20L);
        node2Splits = assignments3.values().stream().map(Split::getConnectorSplit).filter(TestSplitRemote.class::isInstance).count();
        Assertions.assertThat((long)node2Splits).isEqualTo(20L);
    }

    @Test
    public void testMaxUnacknowledgedSplitsPerTask() {
        int maxUnacknowledgedSplitsPerTask = 5;
        this.nodeSelector = this.nodeScheduler.createNodeSelector(TestNodeScheduler.sessionWithMaxUnacknowledgedSplitsPerTask(maxUnacknowledgedSplitsPerTask), Optional.of(TestingHandles.TEST_CATALOG_HANDLE));
        this.setUpNodes();
        ImmutableList.Builder initialSplits = ImmutableList.builder();
        for (int i = 0; i < maxUnacknowledgedSplitsPerTask; ++i) {
            initialSplits.add((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()));
        }
        List nodes = (List)this.nodeManager.getActiveCatalogNodes(TestingHandles.TEST_CATALOG_HANDLE).stream().filter(node -> !node.isCoordinator()).collect(ImmutableList.toImmutableList());
        ArrayList<MockRemoteTaskFactory.MockRemoteTask> tasks = new ArrayList<MockRemoteTaskFactory.MockRemoteTask>();
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor, this.remoteTaskScheduledExecutor);
        int counter = 1;
        for (InternalNode node2 : nodes) {
            TaskId taskId = new TaskId(new StageId("test", 1), counter, 0);
            ++counter;
            MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node2, (List<Split>)initialSplits.build(), this.nodeTaskMap.createPartitionedSplitCountTracker(node2, taskId));
            this.nodeTaskMap.addTask(node2, (RemoteTask)remoteTask);
            remoteTask.setMaxUnacknowledgedSplits(maxUnacknowledgedSplitsPerTask);
            remoteTask.setUnacknowledgedSplits(maxUnacknowledgedSplitsPerTask);
            tasks.add(remoteTask);
        }
        HashSet<Split> splits = new HashSet<Split>();
        for (int i = 0; i < nodes.size(); ++i) {
            splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote()));
        }
        SplitPlacementResult splitPlacements = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(tasks));
        Assertions.assertThat((int)splitPlacements.getAssignments().size()).isEqualTo(0);
        MockRemoteTaskFactory.MockRemoteTask taskOne = (MockRemoteTaskFactory.MockRemoteTask)tasks.get(0);
        taskOne.finishSplits(1);
        taskOne.setUnacknowledgedSplits(taskOne.getUnacknowledgedPartitionedSplitCount() - 1);
        Assertions.assertThat((boolean)splitPlacements.getBlocked().isDone()).isTrue();
        splitPlacements = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(tasks));
        Assertions.assertThat((int)splitPlacements.getAssignments().size()).isEqualTo(1);
        Assertions.assertThat((Collection)splitPlacements.getAssignments().keySet()).contains((Object[])new InternalNode[]{(InternalNode)nodes.get(0)});
        taskOne.clearSplits();
        tasks.forEach(task -> task.setUnacknowledgedSplits(maxUnacknowledgedSplitsPerTask - 1));
        splitPlacements = this.nodeSelector.computeAssignments(splits, (List)ImmutableList.copyOf(tasks));
        Assertions.assertThat((int)splitPlacements.getAssignments().size()).isEqualTo(nodes.size());
        Assertions.assertThat((boolean)splitPlacements.getAssignments().keySet().containsAll(nodes)).isTrue();
    }

    @Test
    @Timeout(value=60L)
    public void testTopologyAwareFailover() {
        this.nodeManager = new InMemoryNodeManager(new InternalNode[]{new InternalNode("node1", URI.create("http://host1.rack1:11"), NodeVersion.UNKNOWN, false), new InternalNode("node2", URI.create("http://host2.rack1:12"), NodeVersion.UNKNOWN, false), new InternalNode("node3", URI.create("http://host3.rack2:13"), NodeVersion.UNKNOWN, false)});
        TopologyAwareNodeSelectorFactory nodeSelectorFactory = new TopologyAwareNodeSelectorFactory((NetworkTopology)new TestNetworkTopology(), (InternalNodeManager)this.nodeManager, this.nodeSchedulerConfig, this.nodeTaskMap, TestNodeScheduler.getNetworkTopologyConfig());
        NodeScheduler nodeScheduler = new NodeScheduler((NodeSelectorFactory)nodeSelectorFactory);
        NodeSelector nodeSelector = nodeScheduler.createNodeSelector(this.session, Optional.of(TestingHandles.TEST_CATALOG_HANDLE));
        Split rigidSplit = new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitLocal(HostAddress.fromString((String)"host99.rack1:11")));
        Assertions.assertThatThrownBy(() -> this.computeSingleAssignment(nodeSelector, rigidSplit)).hasMessageContaining("No nodes available");
        Split flexibleSplit = new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestSplitRemote(HostAddress.fromString((String)"host99.rack1:11")));
        org.assertj.guava.api.Assertions.assertThat(this.computeSingleAssignment(nodeSelector, flexibleSplit)).containsValues((Object[])new Split[]{flexibleSplit});
    }

    private Multimap<InternalNode, Split> computeSingleAssignment(NodeSelector nodeSelector, Split split) {
        return nodeSelector.computeAssignments((Set)ImmutableSet.of((Object)split), (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
    }

    private static Session sessionWithMaxUnacknowledgedSplitsPerTask(int maxUnacknowledgedSplitsPerTask) {
        return TestingSession.testSessionBuilder().setSystemProperty("max_unacknowledged_splits_per_task", Integer.toString(maxUnacknowledgedSplitsPerTask)).build();
    }

    private static TopologyAwareNodeSelectorConfig getNetworkTopologyConfig() {
        return new TopologyAwareNodeSelectorConfig().setLocationSegmentNames((List)ImmutableList.of((Object)"rack", (Object)"machine"));
    }

    private static class TestSplitLocallyAccessible
    implements ConnectorSplit {
        private TestSplitLocallyAccessible() {
        }

        public boolean isRemotelyAccessible() {
            return false;
        }

        public List<HostAddress> getAddresses() {
            return ImmutableList.of((Object)HostAddress.fromString((String)"10.0.0.1:11"));
        }

        public Map<String, String> getSplitInfo() {
            return ImmutableMap.of((Object)"address", (Object)"10.0.0.1:11");
        }

        public long getRetainedSizeInBytes() {
            return 0L;
        }
    }

    private static class TestNetworkTopology
    implements NetworkTopology {
        private TestNetworkTopology() {
        }

        public NetworkLocation locate(HostAddress address) {
            ArrayList parts = new ArrayList(ImmutableList.copyOf((Iterable)Splitter.on((String)".").split((CharSequence)address.getHostText())));
            Collections.reverse(parts);
            return new NetworkLocation(parts);
        }
    }

    private static class TestSplitRemote
    implements ConnectorSplit {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(TestSplitRemote.class);
        private final List<HostAddress> hosts;
        private final SplitWeight splitWeight;

        TestSplitRemote() {
            this(HostAddress.fromString((String)String.format("10.%s.%s.%s:%s", ThreadLocalRandom.current().nextInt(0, 255), ThreadLocalRandom.current().nextInt(0, 255), ThreadLocalRandom.current().nextInt(0, 255), ThreadLocalRandom.current().nextInt(15, 5000))));
        }

        TestSplitRemote(HostAddress host) {
            this(host, SplitWeight.standard());
        }

        TestSplitRemote(HostAddress host, SplitWeight splitWeight) {
            this.hosts = ImmutableList.of((Object)Objects.requireNonNull(host, "host is null"));
            this.splitWeight = Objects.requireNonNull(splitWeight, "splitWeight is null");
        }

        public List<HostAddress> getAddresses() {
            return this.hosts;
        }

        public Map<String, String> getSplitInfo() {
            return ImmutableMap.of((Object)"hosts", (Object)this.hosts.stream().map(HostAddress::toString).collect(Collectors.joining(",")), (Object)"splitWeight", (Object)String.valueOf(this.splitWeight));
        }

        public SplitWeight getSplitWeight() {
            return this.splitWeight;
        }

        public long getRetainedSizeInBytes() {
            return (long)INSTANCE_SIZE + SizeOf.estimatedSizeOf(this.hosts, HostAddress::getRetainedSizeInBytes) + this.splitWeight.getRetainedSizeInBytes();
        }
    }

    private static class TestSplitLocal
    implements ConnectorSplit {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(TestSplitLocal.class);
        private final HostAddress address;
        private final SplitWeight splitWeight;

        private TestSplitLocal() {
            this(HostAddress.fromString((String)"10.0.0.1:11"));
        }

        private TestSplitLocal(HostAddress address) {
            this(address, SplitWeight.standard());
        }

        private TestSplitLocal(HostAddress address, SplitWeight splitWeight) {
            this.address = Objects.requireNonNull(address, "address is null");
            this.splitWeight = Objects.requireNonNull(splitWeight, "splitWeight is null");
        }

        public boolean isRemotelyAccessible() {
            return false;
        }

        public List<HostAddress> getAddresses() {
            return ImmutableList.of((Object)this.address);
        }

        public Map<String, String> getSplitInfo() {
            return ImmutableMap.of((Object)"address", (Object)this.address.toString());
        }

        public SplitWeight getSplitWeight() {
            return this.splitWeight;
        }

        public long getRetainedSizeInBytes() {
            return (long)INSTANCE_SIZE + this.address.getRetainedSizeInBytes() + this.splitWeight.getRetainedSizeInBytes();
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("address", (Object)this.address).toString();
        }
    }
}

