/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import io.airlift.concurrent.Threads;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.trino.client.NodeVersion;
import io.trino.cost.StatsAndCosts;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTask;
import io.trino.execution.StageId;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.TestingRemoteTaskFactory;
import io.trino.execution.buffer.OutputBufferStatus;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.ScaledWriterScheduler;
import io.trino.execution.scheduler.StageExecution;
import io.trino.execution.scheduler.TaskLifecycleListener;
import io.trino.execution.scheduler.UniformNodeSelectorFactory;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Split;
import io.trino.metadata.TableHandle;
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.Partitioning;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PartitioningScheme;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingMetadata;
import io.trino.testing.TestingSession;
import io.trino.util.FinalizerService;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestScaledWriterScheduler {
    private static final PlanNodeId TABLE_SCAN_NODE_ID = new PlanNodeId("plan_id");
    private static final InternalNode NODE_1 = new InternalNode("node-1", URI.create("http://127.0.0.1:11"), new NodeVersion("version-1"), false);
    private static final InternalNode NODE_2 = new InternalNode("node-2", URI.create("http://127.0.0.1:12"), new NodeVersion("version-1"), false);
    private static final InternalNode NODE_3 = new InternalNode("node-3", URI.create("http://127.0.0.1:13"), new NodeVersion("version-1"), false);

    @Test
    public void testGetNewTaskCountWithUnderutilizedTasksWithoutSkewness() {
        TaskStatus taskStatus1 = TestScaledWriterScheduler.buildTaskStatus(true, 12345L);
        TaskStatus taskStatus2 = TestScaledWriterScheduler.buildTaskStatus(false, 12345L);
        TaskStatus taskStatus3 = TestScaledWriterScheduler.buildTaskStatus(false, 12345L);
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaleWriterSchedulerWithInitialTasks(taskStatus1, taskStatus2, taskStatus3);
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)0);
    }

    @Test
    public void testGetNewTaskCountWithOverutilizedTasksWithoutSkewness() {
        TaskStatus taskStatus1 = TestScaledWriterScheduler.buildTaskStatus(true, 12345L);
        TaskStatus taskStatus2 = TestScaledWriterScheduler.buildTaskStatus(true, 12345L);
        TaskStatus taskStatus3 = TestScaledWriterScheduler.buildTaskStatus(false, 12345L);
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaleWriterSchedulerWithInitialTasks(taskStatus1, taskStatus2, taskStatus3);
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)1);
    }

    @Test
    public void testGetNewTaskCountWithOverutilizedSkewedTaskAndUnderutilizedNonSkewedTasks() {
        TaskStatus taskStatus1 = TestScaledWriterScheduler.buildTaskStatus(true, 1234567L);
        TaskStatus taskStatus2 = TestScaledWriterScheduler.buildTaskStatus(false, 12345L);
        TaskStatus taskStatus3 = TestScaledWriterScheduler.buildTaskStatus(false, 123456L);
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaleWriterSchedulerWithInitialTasks(taskStatus1, taskStatus2, taskStatus3);
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)1);
    }

    @Test
    public void testGetNewTaskCountWithUnderutilizedSkewedTaskAndOverutilizedNonSkewedTasks() {
        TaskStatus taskStatus1 = TestScaledWriterScheduler.buildTaskStatus(true, 12345L);
        TaskStatus taskStatus2 = TestScaledWriterScheduler.buildTaskStatus(true, 123456L);
        TaskStatus taskStatus3 = TestScaledWriterScheduler.buildTaskStatus(false, 1234567L);
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaleWriterSchedulerWithInitialTasks(taskStatus1, taskStatus2, taskStatus3);
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)1);
    }

    @Test
    public void testGetNewTaskCountWhenWrittenBytesIsGreaterThanMinWrittenBytesForScaleUp() {
        TaskStatus taskStatus1 = TestScaledWriterScheduler.buildTaskStatus(1, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        TaskStatus taskStatus2 = TestScaledWriterScheduler.buildTaskStatus(1, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        TaskStatus taskStatus3 = TestScaledWriterScheduler.buildTaskStatus(2, DataSize.of((long)64L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaleWriterSchedulerWithInitialTasks(taskStatus1, taskStatus2, taskStatus3);
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)1);
    }

    @Test
    public void testGetNewTaskCountWhenWrittenBytesIsLessThanMinWrittenBytesForScaleUp() {
        TaskStatus taskStatus1 = TestScaledWriterScheduler.buildTaskStatus(1, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        TaskStatus taskStatus2 = TestScaledWriterScheduler.buildTaskStatus(1, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        TaskStatus taskStatus3 = TestScaledWriterScheduler.buildTaskStatus(2, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaleWriterSchedulerWithInitialTasks(taskStatus1, taskStatus2, taskStatus3);
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)0);
    }

    @Test
    public void testGetNewTaskCountWhenExistingWriterTaskMaxWriterCountIsEmpty() {
        TaskStatus taskStatus1 = TestScaledWriterScheduler.buildTaskStatus(1, DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        TaskStatus taskStatus2 = TestScaledWriterScheduler.buildTaskStatus(2, DataSize.of((long)100L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        TaskStatus taskStatus3 = TestScaledWriterScheduler.buildTaskStatus(true, 12345L, Optional.empty(), DataSize.of((long)0L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaleWriterSchedulerWithInitialTasks(taskStatus1, taskStatus2, taskStatus3);
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)0);
    }

    @Test
    public void testNewTaskCountWhenNodesUpperLimitIsNotExceeded() {
        TaskStatus taskStatus = TestScaledWriterScheduler.buildTaskStatus(true, 123456L);
        AtomicReference<ImmutableList> taskStatusProvider = new AtomicReference<ImmutableList>(ImmutableList.of((Object)taskStatus));
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaledWriterScheduler(taskStatusProvider, 2);
        scaledWriterScheduler.schedule();
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)1);
    }

    @Test
    public void testNewTaskCountWhenNodesUpperLimitIsExceeded() {
        TaskStatus taskStatus = TestScaledWriterScheduler.buildTaskStatus(true, 123456L);
        AtomicReference<ImmutableList> taskStatusProvider = new AtomicReference<ImmutableList>(ImmutableList.of((Object)taskStatus));
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaledWriterScheduler(taskStatusProvider, 1);
        scaledWriterScheduler.schedule();
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)0);
    }

    private ScaledWriterScheduler buildScaleWriterSchedulerWithInitialTasks(TaskStatus taskStatus1, TaskStatus taskStatus2, TaskStatus taskStatus3) {
        AtomicReference<ImmutableList> taskStatusProvider = new AtomicReference<ImmutableList>(ImmutableList.of());
        ScaledWriterScheduler scaledWriterScheduler = this.buildScaledWriterScheduler(taskStatusProvider, 100);
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)1);
        taskStatusProvider.set(ImmutableList.of((Object)taskStatus1));
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)1);
        taskStatusProvider.set(ImmutableList.of((Object)taskStatus1, (Object)taskStatus2));
        Assert.assertEquals((int)scaledWriterScheduler.schedule().getNewTasks().size(), (int)1);
        taskStatusProvider.set(ImmutableList.of((Object)taskStatus1, (Object)taskStatus2, (Object)taskStatus3));
        return scaledWriterScheduler;
    }

    private ScaledWriterScheduler buildScaledWriterScheduler(AtomicReference<List<TaskStatus>> taskStatusProvider, int maxWritersNodesCount) {
        return new ScaledWriterScheduler((StageExecution)new TestingStageExecution(TestScaledWriterScheduler.createFragment()), taskStatusProvider::get, taskStatusProvider::get, new UniformNodeSelectorFactory((InternalNodeManager)new InMemoryNodeManager(new InternalNode[]{NODE_1, NODE_2, NODE_3}), new NodeSchedulerConfig().setIncludeCoordinator(true), new NodeTaskMap(new FinalizerService())).createNodeSelector(TestingSession.testSessionBuilder().build(), Optional.empty()), Executors.newScheduledThreadPool(10, Threads.threadsNamed((String)"task-notification-%s")), DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE), maxWritersNodesCount);
    }

    private static TaskStatus buildTaskStatus(boolean isOutputBufferOverUtilized, long outputDataSize) {
        return TestScaledWriterScheduler.buildTaskStatus(isOutputBufferOverUtilized, outputDataSize, Optional.of(1), DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE));
    }

    private static TaskStatus buildTaskStatus(int maxWriterCount, DataSize physicalWrittenDataSize) {
        return TestScaledWriterScheduler.buildTaskStatus(true, 12345L, Optional.of(maxWriterCount), physicalWrittenDataSize);
    }

    private static TaskStatus buildTaskStatus(boolean isOutputBufferOverUtilized, long outputDataSize, Optional<Integer> maxWriterCount, DataSize physicalWrittenDataSize) {
        return new TaskStatus(TaskId.valueOf((String)"taskId"), "task-instance-id", 0L, TaskState.RUNNING, URI.create("fake://task/taskId/node/some_node"), "some_node", (List)ImmutableList.of(), 0, 0, new OutputBufferStatus(OptionalLong.empty(), isOutputBufferOverUtilized, false), DataSize.ofBytes((long)outputDataSize), physicalWrittenDataSize, maxWriterCount, DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE), DataSize.of((long)0L, (DataSize.Unit)DataSize.Unit.MEGABYTE), 0L, Duration.valueOf((String)"0s"), 0L, 1L, 1L);
    }

    private static PlanFragment createFragment() {
        Symbol symbol = new Symbol("column");
        TableScanNode tableScan = TableScanNode.newInstance((PlanNodeId)TABLE_SCAN_NODE_ID, (TableHandle)TestingHandles.TEST_TABLE_HANDLE, (List)ImmutableList.of((Object)symbol), (Map)ImmutableMap.of((Object)symbol, (Object)new TestingMetadata.TestingColumnHandle("column")), (boolean)false, Optional.empty());
        return new PlanFragment(new PlanFragmentId("plan_id"), (PlanNode)tableScan, (Map)ImmutableMap.of((Object)symbol, (Object)VarcharType.VARCHAR), SystemPartitioningHandle.SOURCE_DISTRIBUTION, Optional.empty(), (List)ImmutableList.of((Object)TABLE_SCAN_NODE_ID), new PartitioningScheme(Partitioning.create((PartitioningHandle)SystemPartitioningHandle.SINGLE_DISTRIBUTION, (List)ImmutableList.of()), (List)ImmutableList.of((Object)symbol)), StatsAndCosts.empty(), (List)ImmutableList.of(), Optional.empty());
    }

    private static class TestingStageExecution
    implements StageExecution {
        private final PlanFragment fragment;

        public TestingStageExecution(PlanFragment fragment) {
            this.fragment = Objects.requireNonNull(fragment, "fragment is null");
        }

        public PlanFragment getFragment() {
            throw new UnsupportedOperationException();
        }

        public boolean isAnyTaskBlocked() {
            throw new UnsupportedOperationException();
        }

        public StageExecution.State getState() {
            throw new UnsupportedOperationException();
        }

        public void addStateChangeListener(StateMachine.StateChangeListener<StageExecution.State> stateChangeListener) {
            throw new UnsupportedOperationException();
        }

        public StageId getStageId() {
            throw new UnsupportedOperationException();
        }

        public int getAttemptId() {
            throw new UnsupportedOperationException();
        }

        public Span getStageSpan() {
            throw new UnsupportedOperationException();
        }

        public void beginScheduling() {
            throw new UnsupportedOperationException();
        }

        public void transitionToSchedulingSplits() {
            throw new UnsupportedOperationException();
        }

        public TaskLifecycleListener getTaskLifecycleListener() {
            throw new UnsupportedOperationException();
        }

        public void schedulingComplete() {
            throw new UnsupportedOperationException();
        }

        public void schedulingComplete(PlanNodeId partitionedSource) {
            throw new UnsupportedOperationException();
        }

        public void cancel() {
            throw new UnsupportedOperationException();
        }

        public void abort() {
            throw new UnsupportedOperationException();
        }

        public void recordGetSplitTime(long start) {
            throw new UnsupportedOperationException();
        }

        public Optional<RemoteTask> scheduleTask(InternalNode node, int partition, Multimap<PlanNodeId, Split> initialSplits) {
            return Optional.of(new TestingRemoteTaskFactory.TestingRemoteTask(TaskId.valueOf((String)"taskId"), "nodeId", this.fragment));
        }

        public void failTask(TaskId taskId, Throwable failureCause) {
            throw new UnsupportedOperationException();
        }

        public List<RemoteTask> getAllTasks() {
            throw new UnsupportedOperationException();
        }

        public List<TaskStatus> getTaskStatuses() {
            throw new UnsupportedOperationException();
        }

        public Optional<ExecutionFailureInfo> getFailureCause() {
            throw new UnsupportedOperationException();
        }
    }
}

