/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Ticker;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import io.airlift.concurrent.Threads;
import io.airlift.testing.TestingTicker;
import io.trino.Session;
import io.trino.client.NodeVersion;
import io.trino.execution.MockRemoteTaskFactory;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTask;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.scheduler.NodeMap;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.NodeSelector;
import io.trino.execution.scheduler.NodeSelectorFactory;
import io.trino.execution.scheduler.UniformNodeSelector;
import io.trino.execution.scheduler.UniformNodeSelectorFactory;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Split;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingSplit;
import io.trino.util.FinalizerService;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(value=TestInstance.Lifecycle.PER_METHOD)
public class TestUniformNodeSelector {
    private static final InternalNode node1 = new InternalNode("node1", URI.create("http://10.0.0.1:13"), NodeVersion.UNKNOWN, false);
    private static final InternalNode node2 = new InternalNode("node2", URI.create("http://10.0.0.1:12"), NodeVersion.UNKNOWN, false);
    private final Set<Split> splits = new LinkedHashSet<Split>();
    private FinalizerService finalizerService;
    private NodeTaskMap nodeTaskMap;
    private InMemoryNodeManager nodeManager;
    private NodeSchedulerConfig nodeSchedulerConfig;
    private NodeScheduler nodeScheduler;
    private NodeSelector nodeSelector;
    private Map<InternalNode, RemoteTask> taskMap;
    private ExecutorService remoteTaskExecutor;
    private ScheduledExecutorService remoteTaskScheduledExecutor;
    private Session session;

    @BeforeEach
    public void setUp() {
        this.session = TestingSession.testSessionBuilder().build();
        this.finalizerService = new FinalizerService();
        this.nodeTaskMap = new NodeTaskMap(this.finalizerService);
        this.nodeManager = new InMemoryNodeManager(new InternalNode[0]);
        this.nodeManager.addNodes(new InternalNode[]{node1});
        this.nodeManager.addNodes(new InternalNode[]{node2});
        this.nodeSchedulerConfig = new NodeSchedulerConfig().setMaxSplitsPerNode(20).setMinPendingSplitsPerTask(10).setMaxAdjustedPendingSplitsWeightPerTask(100).setIncludeCoordinator(false);
        this.nodeScheduler = new NodeScheduler((NodeSelectorFactory)new UniformNodeSelectorFactory((InternalNodeManager)this.nodeManager, this.nodeSchedulerConfig, this.nodeTaskMap));
        this.taskMap = new HashMap<InternalNode, RemoteTask>();
        this.nodeSelector = this.nodeScheduler.createNodeSelector(this.session, Optional.of(TestingHandles.TEST_CATALOG_HANDLE));
        this.remoteTaskExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"remoteTaskExecutor-%s"));
        this.remoteTaskScheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)"remoteTaskScheduledExecutor-%s"));
        this.finalizerService.start();
    }

    @AfterEach
    public void tearDown() {
        this.remoteTaskExecutor.shutdown();
        this.remoteTaskExecutor = null;
        this.remoteTaskScheduledExecutor.shutdown();
        this.remoteTaskScheduledExecutor = null;
        this.nodeSchedulerConfig = null;
        this.nodeScheduler = null;
        this.nodeSelector = null;
        this.finalizerService.destroy();
        this.finalizerService = null;
    }

    @Test
    public void testQueueSizeAdjustmentScaleDown() {
        TestingTicker ticker = new TestingTicker();
        UniformNodeSelector.QueueSizeAdjuster queueSizeAdjuster = new UniformNodeSelector.QueueSizeAdjuster(10L, 100L, (Ticker)ticker);
        this.nodeSelector = new UniformNodeSelector((InternalNodeManager)this.nodeManager, this.nodeTaskMap, false, () -> this.createNodeMap(TestingHandles.TEST_CATALOG_HANDLE), 10, 100L, 10L, 500, NodeSchedulerConfig.SplitsBalancingPolicy.STAGE, false, queueSizeAdjuster);
        for (int i = 0; i < 20; ++i) {
            this.splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)TestingSplit.createRemoteSplit()));
        }
        Multimap assignments1 = this.nodeSelector.computeAssignments(this.splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments1.size()).isEqualTo(2);
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor, this.remoteTaskScheduledExecutor);
        int task = 0;
        for (InternalNode node : assignments1.keySet()) {
            TaskId taskId = new TaskId(new StageId("test", 1), task, 0);
            ++task;
            MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, (List<Split>)ImmutableList.copyOf((Collection)assignments1.get((Object)node)), this.nodeTaskMap.createPartitionedSplitCountTracker(node, taskId));
            remoteTask.startSplits(remoteTask.getPartitionedSplitsInfo().getCount());
            this.nodeTaskMap.addTask(node, (RemoteTask)remoteTask);
            this.taskMap.put(node, remoteTask);
        }
        Sets.SetView unassignedSplits = Sets.difference(this.splits, new HashSet(assignments1.values()));
        Assertions.assertThat((Collection)unassignedSplits).hasSize(18);
        Multimap assignments2 = this.nodeSelector.computeAssignments((Set)unassignedSplits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments2.size()).isEqualTo(2);
        for (InternalNode node : assignments2.keySet()) {
            MockRemoteTaskFactory.MockRemoteTask remoteTask = (MockRemoteTaskFactory.MockRemoteTask)this.taskMap.get(node);
            remoteTask.addSplits((Multimap<PlanNodeId, Split>)ImmutableMultimap.builder().putAll((Object)new PlanNodeId("sourceId"), (Iterable)assignments2.get((Object)node)).build());
        }
        long maxPendingSplitsWeightPerTaskBeforeScaleDown = queueSizeAdjuster.getAdjustedMaxPendingSplitsWeightPerTask(node1.getNodeIdentifier());
        Assertions.assertThat((int)20).isEqualTo(maxPendingSplitsWeightPerTaskBeforeScaleDown);
        ticker.increment(999L, TimeUnit.MILLISECONDS);
        Multimap assignments3 = this.nodeSelector.computeAssignments((Set)unassignedSplits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments3.size()).isEqualTo(0);
        Assertions.assertThat((long)maxPendingSplitsWeightPerTaskBeforeScaleDown).isEqualTo(queueSizeAdjuster.getAdjustedMaxPendingSplitsWeightPerTask(node1.getNodeIdentifier()));
        ticker.increment(1L, TimeUnit.MILLISECONDS);
        Multimap assignments4 = this.nodeSelector.computeAssignments((Set)unassignedSplits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments4.size()).isEqualTo(0);
        long maxPendingSplitsWeightPerTaskAfterScaleDown = queueSizeAdjuster.getAdjustedMaxPendingSplitsWeightPerTask(node1.getNodeIdentifier());
        Assertions.assertThat((int)13).isEqualTo(maxPendingSplitsWeightPerTaskAfterScaleDown);
    }

    @Test
    public void testQueueSizeAdjustmentAllNodes() {
        for (int i = 0; i < 180; ++i) {
            this.splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)TestingSplit.createRemoteSplit()));
        }
        Multimap assignments1 = this.nodeSelector.computeAssignments(this.splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments1.size()).isEqualTo(40);
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor, this.remoteTaskScheduledExecutor);
        int task = 0;
        for (InternalNode node : assignments1.keySet()) {
            TaskId taskId = new TaskId(new StageId("test", 1), task, 0);
            ++task;
            MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, (List<Split>)ImmutableList.copyOf((Collection)assignments1.get((Object)node)), this.nodeTaskMap.createPartitionedSplitCountTracker(node, taskId));
            remoteTask.startSplits(remoteTask.getPartitionedSplitsInfo().getCount());
            this.nodeTaskMap.addTask(node, (RemoteTask)remoteTask);
            this.taskMap.put(node, remoteTask);
        }
        Sets.SetView unassignedSplits = Sets.difference(this.splits, new HashSet(assignments1.values()));
        Assertions.assertThat((Collection)unassignedSplits).hasSize(140);
        Multimap assignments2 = this.nodeSelector.computeAssignments((Set)unassignedSplits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        for (Object node : assignments2.keySet()) {
            MockRemoteTaskFactory.MockRemoteTask remoteTask = (MockRemoteTaskFactory.MockRemoteTask)this.taskMap.get(node);
            remoteTask.addSplits((Multimap<PlanNodeId, Split>)ImmutableMultimap.builder().putAll((Object)new PlanNodeId("sourceId"), (Iterable)assignments2.get(node)).build());
            remoteTask.startSplits(remoteTask.getPartitionedSplitsInfo().getCount());
        }
        unassignedSplits = Sets.difference((Set)unassignedSplits, new HashSet(assignments2.values()));
        Assertions.assertThat((Collection)unassignedSplits).hasSize(100);
        Multimap assignments3 = this.nodeSelector.computeAssignments((Set)unassignedSplits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        for (InternalNode node : assignments3.keySet()) {
            RemoteTask remoteTask = this.taskMap.get(node);
            remoteTask.addSplits((Multimap)ImmutableMultimap.builder().putAll((Object)new PlanNodeId("sourceId"), (Iterable)assignments3.get((Object)node)).build());
        }
        unassignedSplits = Sets.difference((Set)unassignedSplits, new HashSet(assignments3.values()));
        Assertions.assertThat((Collection)unassignedSplits).hasSize(20);
        Multimap assignments4 = this.nodeSelector.computeAssignments((Set)unassignedSplits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        unassignedSplits = Sets.difference((Set)unassignedSplits, new HashSet(assignments4.values()));
        Assertions.assertThat((Collection)unassignedSplits).hasSize(20);
    }

    @Test
    public void testQueueSizeAdjustmentOneOfAll() {
        for (int i = 0; i < 180; ++i) {
            this.splits.add(new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)TestingSplit.createRemoteSplit()));
        }
        Multimap assignments1 = this.nodeSelector.computeAssignments(this.splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        Assertions.assertThat((int)assignments1.size()).isEqualTo(40);
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor, this.remoteTaskScheduledExecutor);
        int task = 0;
        for (InternalNode node : assignments1.keySet()) {
            TaskId taskId = new TaskId(new StageId("test", 1), task, 0);
            ++task;
            MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, (List<Split>)ImmutableList.copyOf((Collection)assignments1.get((Object)node)), this.nodeTaskMap.createPartitionedSplitCountTracker(node, taskId));
            if (node.equals((Object)node1)) {
                remoteTask.startSplits(remoteTask.getPartitionedSplitsInfo().getCount());
            }
            this.nodeTaskMap.addTask(node, (RemoteTask)remoteTask);
            this.taskMap.put(node, remoteTask);
        }
        Sets.SetView unassignedSplits = Sets.difference(this.splits, new HashSet(assignments1.values()));
        Assertions.assertThat((Collection)unassignedSplits).hasSize(140);
        Multimap assignments2 = this.nodeSelector.computeAssignments((Set)unassignedSplits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        for (InternalNode node : assignments2.keySet()) {
            MockRemoteTaskFactory.MockRemoteTask remoteTask = (MockRemoteTaskFactory.MockRemoteTask)this.taskMap.get(node);
            remoteTask.addSplits((Multimap<PlanNodeId, Split>)ImmutableMultimap.builder().putAll((Object)new PlanNodeId("sourceId"), (Iterable)assignments2.get((Object)node)).build());
            if (!node.equals((Object)node1)) continue;
            remoteTask.startSplits(remoteTask.getPartitionedSplitsInfo().getCount());
        }
        unassignedSplits = Sets.difference((Set)unassignedSplits, new HashSet(assignments2.values()));
        Assertions.assertThat((Collection)unassignedSplits).hasSize(120);
        Assertions.assertThat((Collection)assignments2.get((Object)node1)).hasSize(20);
        Assertions.assertThat((boolean)assignments2.containsKey((Object)node2)).isFalse();
        Multimap assignments3 = this.nodeSelector.computeAssignments((Set)unassignedSplits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        for (InternalNode node : assignments3.keySet()) {
            MockRemoteTaskFactory.MockRemoteTask remoteTask = (MockRemoteTaskFactory.MockRemoteTask)this.taskMap.get(node);
            remoteTask.addSplits((Multimap<PlanNodeId, Split>)ImmutableMultimap.builder().putAll((Object)new PlanNodeId("sourceId"), (Iterable)assignments3.get((Object)node)).build());
            if (!node.equals((Object)node1)) continue;
            remoteTask.startSplits(remoteTask.getPartitionedSplitsInfo().getCount());
        }
        unassignedSplits = Sets.difference((Set)unassignedSplits, new HashSet(assignments3.values()));
        Assertions.assertThat((Collection)unassignedSplits).hasSize(80);
        Assertions.assertThat((Collection)assignments3.get((Object)node1)).hasSize(40);
        Assertions.assertThat((boolean)assignments2.containsKey((Object)node2)).isFalse();
    }

    @Test
    public void testFailover() {
        this.nodeSelector = new UniformNodeSelector((InternalNodeManager)this.nodeManager, this.nodeTaskMap, false, () -> this.createNodeMap(TestingHandles.TEST_CATALOG_HANDLE), 10, 2000L, 1000L, 2000, NodeSchedulerConfig.SplitsBalancingPolicy.STAGE, true, new UniformNodeSelector.QueueSizeAdjuster(1000L, 10000L, (Ticker)new TestingTicker()));
        Split rigidSplit = new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestingSplit(false, (List)ImmutableList.of((Object)node1.getHostAndPort())));
        this.splits.add(rigidSplit);
        Split flexibleSplit = new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)new TestingSplit(true, (List)ImmutableList.of((Object)node1.getHostAndPort())));
        this.splits.add(flexibleSplit);
        Multimap assignmentsNode1Alive = this.nodeSelector.computeAssignments(this.splits, (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        ArrayListMultimap expected = ArrayListMultimap.create();
        expected.putAll((Object)node1, this.splits);
        org.assertj.guava.api.Assertions.assertThat((Multimap)assignmentsNode1Alive).hasSameEntriesAs((Multimap)expected);
        this.nodeManager.removeNode(node1);
        Assertions.assertThatThrownBy(() -> this.nodeSelector.computeAssignments(this.splits, (List)ImmutableList.copyOf(this.taskMap.values()))).hasMessage("No nodes available to run query");
        Multimap assignmentsNode1Dead = this.nodeSelector.computeAssignments((Set)ImmutableSet.of((Object)flexibleSplit), (List)ImmutableList.copyOf(this.taskMap.values())).getAssignments();
        expected = ArrayListMultimap.create();
        expected.put((Object)node2, (Object)flexibleSplit);
        org.assertj.guava.api.Assertions.assertThat((Multimap)assignmentsNode1Dead).hasSameEntriesAs((Multimap)expected);
    }

    private NodeMap createNodeMap(CatalogHandle catalogHandle) {
        Set nodes = this.nodeManager.getActiveCatalogNodes(catalogHandle);
        Set coordinatorNodeIds = (Set)this.nodeManager.getCoordinators().stream().map(InternalNode::getNodeIdentifier).collect(ImmutableSet.toImmutableSet());
        ImmutableSetMultimap.Builder byHostAndPort = ImmutableSetMultimap.builder();
        ImmutableSetMultimap.Builder byHost = ImmutableSetMultimap.builder();
        for (InternalNode node : nodes) {
            try {
                byHostAndPort.put((Object)node.getHostAndPort(), (Object)node);
                byHost.put((Object)node.getInternalAddress(), (Object)node);
            }
            catch (UnknownHostException unknownHostException) {}
        }
        return new NodeMap((SetMultimap)byHostAndPort.build(), (SetMultimap)byHost.build(), (SetMultimap)ImmutableSetMultimap.of(), coordinatorNodeIds);
    }
}

