/*
 * Decompiled with CFR 0.152.
 */
package io.trino.memory;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.execution.QueryState;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.memory.ClusterMemoryManager;
import io.trino.memory.ClusterMemoryPool;
import io.trino.memory.MemoryPool;
import io.trino.operator.BlockedReason;
import io.trino.plugin.blackhole.BlackHolePlugin;
import io.trino.server.BasicQueryInfo;
import io.trino.server.BasicQueryStats;
import io.trino.server.testing.TestingTrinoServer;
import io.trino.spi.Plugin;
import io.trino.spi.StandardErrorCode;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingSession;
import io.trino.testing.assertions.Assert;
import io.trino.tests.tpch.TpchQueryRunner;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Execution(value=ExecutionMode.SAME_THREAD)
public class TestMemoryManager {
    private static final Session SESSION = TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("sf1000").build();
    private static final Session TINY_SESSION = TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("tiny").build();
    private ExecutorService executor;

    @BeforeAll
    public void setUp() {
        this.executor = Executors.newCachedThreadPool();
    }

    @AfterAll
    public void shutdown() {
        this.executor.shutdownNow();
        this.executor = null;
    }

    @Test
    @Timeout(value=240L)
    public void testResourceOverCommit() throws Exception {
        ImmutableMap properties = ImmutableMap.builder().put((Object)"query.max-memory-per-node", (Object)"1kB").put((Object)"query.max-memory", (Object)"1kB").buildOrThrow();
        try (DistributedQueryRunner queryRunner = TestMemoryManager.createQueryRunner(TINY_SESSION, (Map<String, String>)properties);){
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> TestMemoryManager.lambda$testResourceOverCommit$0((QueryRunner)queryRunner)).isInstanceOf(RuntimeException.class)).hasMessageStartingWith("Query exceeded per-node memory limit of ");
            Session session = TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("tiny").setSystemProperty("resource_overcommit", "true").build();
            queryRunner.execute(session, "SELECT COUNT(*), clerk FROM orders GROUP BY clerk");
        }
    }

    @Test
    @Timeout(value=240L)
    public void testOutOfMemoryKiller() throws Exception {
        ImmutableMap properties = ImmutableMap.builder().put((Object)"query.low-memory-killer.policy", (Object)"total-reservation").buildOrThrow();
        try (DistributedQueryRunner queryRunner = TestMemoryManager.createQueryRunner(TINY_SESSION, (Map<String, String>)properties);){
            queryRunner.installPlugin((Plugin)new BlackHolePlugin());
            queryRunner.createCatalog("blackhole", "blackhole");
            queryRunner.execute("CREATE TABLE blackhole.default.take_30s(dummy varchar(10)) WITH (split_count=1, pages_per_split=30, rows_per_page=1, page_processing_delay='1s')");
            TaskId fakeTaskId = new TaskId(new StageId("fake", 0), 0, 0);
            for (TestingTrinoServer server : queryRunner.getServers()) {
                MemoryPool memoryPool = server.getLocalMemoryManager().getMemoryPool();
                Assertions.assertThat((boolean)memoryPool.tryReserve(fakeTaskId, "test", memoryPool.getMaxBytes())).isTrue();
            }
            int queries = 2;
            ExecutorCompletionService<MaterializedResult> completionService = new ExecutorCompletionService<MaterializedResult>(this.executor);
            for (int i = 0; i < queries; ++i) {
                completionService.submit(() -> queryRunner.execute("SELECT COUNT(*), clerk FROM (SELECT clerk FROM orders UNION ALL SELECT dummy FROM blackhole.default.take_30s)GROUP BY clerk"));
            }
            Assert.assertEventually(() -> Assertions.assertThat((List)queryRunner.getCoordinator().getQueryManager().getQueries()).hasSize(1 + queries));
            this.waitForQueryToBeKilled((QueryRunner)queryRunner);
            for (TestingTrinoServer server : queryRunner.getServers()) {
                MemoryPool pool = server.getLocalMemoryManager().getMemoryPool();
                Assertions.assertThat((pool.getReservedBytes() > 0L ? 1 : 0) != 0).isTrue();
                pool.free(fakeTaskId, "test", pool.getMaxBytes());
                Assertions.assertThat((pool.getFreeBytes() > 0L ? 1 : 0) != 0).isTrue();
            }
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
                for (int i = 0; i < queries; ++i) {
                    completionService.take().get();
                }
            }).isInstanceOf(ExecutionException.class)).hasMessageMatching(".*Query killed because the cluster is out of memory. Please try again in a few minutes.");
        }
    }

    private void waitForQueryToBeKilled(QueryRunner queryRunner) throws InterruptedException {
        while (true) {
            boolean hasRunningQuery = false;
            for (BasicQueryInfo info : queryRunner.getCoordinator().getQueryManager().getQueries()) {
                if (info.getState() == QueryState.FAILED) {
                    Assertions.assertThat((Object)info.getErrorCode()).isEqualTo((Object)StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode());
                    return;
                }
                ((ObjectAssert)Assertions.assertThat((Object)info.getErrorCode()).describedAs("errorCode unexpectedly present for " + String.valueOf(info), new Object[0])).isNull();
                if (info.getState().isDone()) continue;
                hasRunningQuery = true;
            }
            Preconditions.checkState((boolean)hasRunningQuery, (Object)"All queries already completed without failure");
            TimeUnit.MILLISECONDS.sleep(10L);
        }
    }

    @Test
    @Timeout(value=240L)
    public void testNoLeak() throws Exception {
        this.testNoLeak("SELECT clerk FROM orders");
        this.testNoLeak("SELECT COUNT(*), clerk FROM orders WHERE orderstatus='O' GROUP BY clerk");
    }

    private void testNoLeak(@Language(value="SQL") String query) throws Exception {
        ImmutableMap properties = ImmutableMap.of((Object)"task.per-operator-cpu-timer-enabled", (Object)"true");
        try (DistributedQueryRunner queryRunner = TestMemoryManager.createQueryRunner(TINY_SESSION, (Map<String, String>)properties);){
            this.executor.submit(() -> queryRunner.execute(query)).get();
            for (BasicQueryInfo info : queryRunner.getCoordinator().getQueryManager().getQueries()) {
                Assertions.assertThat((Comparable)info.getState()).isEqualTo((Object)QueryState.FINISHED);
            }
            for (TestingTrinoServer worker : queryRunner.getServers()) {
                MemoryPool pool = worker.getLocalMemoryManager().getMemoryPool();
                Assertions.assertThat((long)pool.getMaxBytes()).isEqualTo(pool.getFreeBytes());
            }
        }
    }

    @Test
    @Timeout(value=240L)
    public void testClusterPools() throws Exception {
        ImmutableMap properties = ImmutableMap.of((Object)"task.per-operator-cpu-timer-enabled", (Object)"true");
        try (DistributedQueryRunner queryRunner = TestMemoryManager.createQueryRunner(TINY_SESSION, (Map<String, String>)properties);){
            MemoryPool pool;
            TaskId fakeTaskId = new TaskId(new StageId("fake", 0), 0, 0);
            for (TestingTrinoServer server : queryRunner.getServers()) {
                MemoryPool pool2 = server.getLocalMemoryManager().getMemoryPool();
                Assertions.assertThat((boolean)pool2.tryReserve(fakeTaskId, "test", pool2.getMaxBytes())).isTrue();
            }
            ArrayList<Future<MaterializedResult>> queryFutures = new ArrayList<Future<MaterializedResult>>();
            for (int i = 0; i < 2; ++i) {
                queryFutures.add(this.executor.submit(() -> queryRunner.execute("SELECT COUNT(*), cast(orderkey as varchar), partkey FROM lineitem GROUP BY cast(orderkey as varchar), partkey")));
            }
            ClusterMemoryManager memoryManager = queryRunner.getCoordinator().getClusterMemoryManager();
            ClusterMemoryPool clusterPool = memoryManager.getPool();
            Assertions.assertThat((Object)clusterPool).isNotNull();
            while (clusterPool.getBlockedNodes() != 2) {
                TimeUnit.MILLISECONDS.sleep(10L);
            }
            List currentQueryInfos = queryRunner.getCoordinator().getQueryManager().getQueries();
            while (currentQueryInfos.size() != 2) {
                TimeUnit.MILLISECONDS.sleep(10L);
                currentQueryInfos = queryRunner.getCoordinator().getQueryManager().getQueries();
            }
            for (BasicQueryInfo basicQueryInfo : currentQueryInfos) {
                Assertions.assertThat((boolean)basicQueryInfo.getState().isDone()).isFalse();
            }
            while (!currentQueryInfos.stream().allMatch(TestMemoryManager::isBlockedWaitingForMemory)) {
                TimeUnit.MILLISECONDS.sleep(10L);
                currentQueryInfos = queryRunner.getCoordinator().getQueryManager().getQueries();
                for (BasicQueryInfo basicQueryInfo : currentQueryInfos) {
                    Assertions.assertThat((boolean)basicQueryInfo.getState().isDone()).isFalse();
                }
            }
            for (TestingTrinoServer testingTrinoServer : queryRunner.getServers()) {
                pool = testingTrinoServer.getLocalMemoryManager().getMemoryPool();
                pool.free(fakeTaskId, "test", pool.getMaxBytes());
                Assertions.assertThat((pool.getFreeBytes() > 0L ? 1 : 0) != 0).isTrue();
            }
            for (Future future : queryFutures) {
                future.get();
            }
            for (BasicQueryInfo basicQueryInfo : queryRunner.getCoordinator().getQueryManager().getQueries()) {
                Assertions.assertThat((Comparable)basicQueryInfo.getState()).isEqualTo((Object)QueryState.FINISHED);
            }
            for (TestingTrinoServer testingTrinoServer : queryRunner.getServers()) {
                pool = testingTrinoServer.getLocalMemoryManager().getMemoryPool();
                Assertions.assertThat((long)pool.getMaxBytes()).isEqualTo(pool.getFreeBytes());
            }
        }
    }

    private static boolean isBlockedWaitingForMemory(BasicQueryInfo info) {
        BasicQueryStats stats = info.getQueryStats();
        boolean isWaitingForMemory = stats.getBlockedReasons().contains(BlockedReason.WAITING_FOR_MEMORY);
        if (!isWaitingForMemory) {
            return false;
        }
        return stats.isFullyBlocked() || stats.getRunningDrivers() == 0;
    }

    @Test
    @Timeout(value=60L)
    public void testQueryUserMemoryLimit() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            ImmutableMap properties = ImmutableMap.builder().put((Object)"task.max-partial-aggregation-memory", (Object)"1B").put((Object)"query.max-memory", (Object)"1kB").put((Object)"query.max-total-memory", (Object)"1GB").buildOrThrow();
            try (DistributedQueryRunner queryRunner = TestMemoryManager.createQueryRunner(SESSION, (Map<String, String>)properties);){
                queryRunner.execute(SESSION, "SELECT COUNT(*), repeat(orderstatus, 1000) FROM orders GROUP BY 2");
            }
        }).isInstanceOf(RuntimeException.class)).hasMessageContaining("Query exceeded distributed user memory limit of 1kB");
    }

    @Test
    @Timeout(value=60L)
    public void testQueryTotalMemoryLimit() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            ImmutableMap properties = ImmutableMap.builder().put((Object)"query.max-memory", (Object)"120MB").put((Object)"query.max-total-memory", (Object)"120MB").put((Object)"spill-enabled", (Object)"true").put((Object)"spiller-spill-path", (Object)Paths.get(System.getProperty("java.io.tmpdir"), "trino", "spills", UUID.randomUUID().toString()).toString()).put((Object)"spiller-max-used-space-threshold", (Object)"1.0").buildOrThrow();
            try (DistributedQueryRunner queryRunner = TestMemoryManager.createQueryRunner(SESSION, (Map<String, String>)properties);){
                queryRunner.execute(SESSION, "SELECT * FROM tpch.sf10.orders ORDER BY orderkey");
            }
        }).isInstanceOf(RuntimeException.class)).hasMessageContaining("Query exceeded distributed total memory limit of 120MB");
    }

    @Test
    @Timeout(value=60L)
    public void testQueryMemoryPerNodeLimit() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            ImmutableMap properties = ImmutableMap.builder().put((Object)"task.max-partial-aggregation-memory", (Object)"1B").put((Object)"query.max-memory-per-node", (Object)"1kB").buildOrThrow();
            try (DistributedQueryRunner queryRunner = TestMemoryManager.createQueryRunner(SESSION, (Map<String, String>)properties);){
                queryRunner.execute(SESSION, "SELECT COUNT(*), repeat(orderstatus, 1000) FROM orders GROUP BY 2");
            }
        }).isInstanceOf(RuntimeException.class)).hasMessageContaining("Query exceeded per-node memory limit of 1kB");
    }

    public static DistributedQueryRunner createQueryRunner(Session session, Map<String, String> extraProperties) throws Exception {
        return ((TpchQueryRunner.Builder)((TpchQueryRunner.Builder)((TpchQueryRunner.Builder)TpchQueryRunner.builder().amendSession(sessionBuilder -> Session.builder((Session)session))).setWorkerCount(1)).setExtraProperties(extraProperties)).build();
    }

    private static /* synthetic */ void lambda$testResourceOverCommit$0(QueryRunner queryRunner) throws Throwable {
        queryRunner.execute("SELECT COUNT(*), clerk FROM orders GROUP BY clerk");
    }
}

