/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.inject.Inject;
import io.airlift.concurrent.SetThreadName;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.trino.ExceededCpuLimitException;
import io.trino.ExceededScanLimitException;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.execution.QueryExecution;
import io.trino.execution.QueryInfo;
import io.trino.execution.QueryManager;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.QueryState;
import io.trino.execution.QueryTracker;
import io.trino.execution.StageId;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.memory.ClusterMemoryManager;
import io.trino.server.BasicQueryInfo;
import io.trino.server.ResultQueryInfo;
import io.trino.server.protocol.Slug;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.sql.planner.Plan;
import io.trino.tracing.ScopedSpan;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

@ThreadSafe
public class SqlQueryManager
implements QueryManager {
    private static final Logger log = Logger.get(SqlQueryManager.class);
    private final ClusterMemoryManager memoryManager;
    private final Tracer tracer;
    private final QueryTracker<QueryExecution> queryTracker;
    private final Duration maxQueryCpuTime;
    private final Optional<DataSize> maxQueryScanPhysicalBytes;
    private final ExecutorService queryExecutor;
    private final ThreadPoolExecutorMBean queryExecutorMBean;
    private final ScheduledExecutorService queryManagementExecutor;
    private final ThreadPoolExecutorMBean queryManagementExecutorMBean;

    @Inject
    public SqlQueryManager(ClusterMemoryManager memoryManager, Tracer tracer, QueryManagerConfig queryManagerConfig) {
        this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager is null");
        this.tracer = Objects.requireNonNull(tracer, "tracer is null");
        this.maxQueryCpuTime = queryManagerConfig.getQueryMaxCpuTime();
        this.maxQueryScanPhysicalBytes = queryManagerConfig.getQueryMaxScanPhysicalBytes();
        this.queryExecutor = Executors.newCachedThreadPool(Threads.threadsNamed((String)"query-scheduler-%s"));
        this.queryExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor)this.queryExecutor);
        this.queryManagementExecutor = Executors.newScheduledThreadPool(queryManagerConfig.getQueryManagerExecutorPoolSize(), Threads.threadsNamed((String)"query-management-%s"));
        this.queryManagementExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor)((Object)this.queryManagementExecutor));
        this.queryTracker = new QueryTracker(queryManagerConfig, this.queryManagementExecutor);
    }

    @PostConstruct
    public void start() {
        this.queryTracker.start();
        this.queryManagementExecutor.scheduleWithFixedDelay(() -> {
            try {
                this.enforceMemoryLimits();
            }
            catch (Throwable e) {
                log.error(e, "Error enforcing memory limits");
            }
            try {
                this.enforceCpuLimits();
            }
            catch (Throwable e) {
                log.error(e, "Error enforcing query CPU time limits");
            }
            try {
                this.enforceScanLimits();
            }
            catch (Throwable e) {
                log.error(e, "Error enforcing query scan bytes limits");
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void stop() {
        this.queryTracker.stop();
        this.queryManagementExecutor.shutdownNow();
        this.queryExecutor.shutdownNow();
    }

    @Override
    public List<BasicQueryInfo> getQueries() {
        return (List)this.queryTracker.getAllQueries().stream().map(queryExecution -> {
            try {
                return queryExecution.getBasicQueryInfo();
            }
            catch (RuntimeException runtimeException) {
                return null;
            }
        }).filter(Objects::nonNull).collect(ImmutableList.toImmutableList());
    }

    @Override
    public void setOutputInfoListener(QueryId queryId, Consumer<QueryExecution.QueryOutputInfo> listener) {
        Objects.requireNonNull(listener, "listener is null");
        this.queryTracker.getQuery(queryId).setOutputInfoListener(listener);
    }

    @Override
    public void outputTaskFailed(TaskId taskId, Throwable failure) {
        this.queryTracker.getQuery(taskId.getQueryId()).outputTaskFailed(taskId, failure);
    }

    @Override
    public void resultsConsumed(QueryId queryId) {
        this.queryTracker.getQuery(queryId).resultsConsumed();
    }

    @Override
    public void addStateChangeListener(QueryId queryId, StateMachine.StateChangeListener<QueryState> listener) {
        Objects.requireNonNull(listener, "listener is null");
        this.queryTracker.getQuery(queryId).addStateChangeListener(listener);
    }

    @Override
    public ListenableFuture<QueryState> getStateChange(QueryId queryId, QueryState currentState) {
        return this.queryTracker.tryGetQuery(queryId).map(query -> query.getStateChange(currentState)).orElseGet(() -> Futures.immediateFailedFuture((Throwable)new NoSuchElementException()));
    }

    @Override
    public BasicQueryInfo getQueryInfo(QueryId queryId) {
        return this.queryTracker.getQuery(queryId).getBasicQueryInfo();
    }

    @Override
    public QueryInfo getFullQueryInfo(QueryId queryId) throws NoSuchElementException {
        return this.queryTracker.getQuery(queryId).getQueryInfo();
    }

    @Override
    public ResultQueryInfo getResultQueryInfo(QueryId queryId) throws NoSuchElementException {
        return this.queryTracker.getQuery(queryId).getResultQueryInfo();
    }

    @Override
    public Session getQuerySession(QueryId queryId) throws NoSuchElementException {
        return this.queryTracker.getQuery(queryId).getSession();
    }

    @Override
    public Slug getQuerySlug(QueryId queryId) {
        return this.queryTracker.getQuery(queryId).getSlug();
    }

    public Optional<Plan> getQueryPlan(QueryId queryId) {
        return this.queryTracker.getQuery(queryId).getQueryPlan();
    }

    public void addFinalQueryInfoListener(QueryId queryId, StateMachine.StateChangeListener<QueryInfo> stateChangeListener) {
        this.queryTracker.getQuery(queryId).addFinalQueryInfoListener(stateChangeListener);
    }

    @Override
    public QueryState getQueryState(QueryId queryId) {
        return this.queryTracker.getQuery(queryId).getState();
    }

    @Override
    public boolean hasQuery(QueryId queryId) {
        return this.queryTracker.hasQuery(queryId);
    }

    @Override
    public void recordHeartbeat(QueryId queryId) {
        this.queryTracker.tryGetQuery(queryId).ifPresent(QueryExecution::recordHeartbeat);
    }

    @Override
    public void createQuery(QueryExecution queryExecution) {
        Objects.requireNonNull(queryExecution, "queryExecution is null");
        if (!this.queryTracker.addQuery(queryExecution)) {
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Query %s already registered", queryExecution.getQueryId()));
        }
        queryExecution.addFinalQueryInfoListener(finalQueryInfo -> this.queryTracker.expireQuery(queryExecution.getQueryId()));
        try (SetThreadName setThreadName = new SetThreadName("Query-%s", new Object[]{queryExecution.getQueryId()});
             ScopedSpan ignoredStartScope = ScopedSpan.scopedSpan(this.tracer.spanBuilder("query-start").setParent(Context.current().with((ImplicitContextKeyed)queryExecution.getSession().getQuerySpan())).startSpan());){
            queryExecution.start();
        }
    }

    @Override
    public void failQuery(QueryId queryId, Throwable cause) {
        Objects.requireNonNull(cause, "cause is null");
        this.queryTracker.tryGetQuery(queryId).ifPresent(query -> query.fail(cause));
    }

    @Override
    public void cancelQuery(QueryId queryId) {
        log.debug("Cancel query %s", new Object[]{queryId});
        this.queryTracker.tryGetQuery(queryId).ifPresent(QueryExecution::cancelQuery);
    }

    @Override
    public void cancelStage(StageId stageId) {
        Objects.requireNonNull(stageId, "stageId is null");
        log.debug("Cancel stage %s", new Object[]{stageId});
        this.queryTracker.tryGetQuery(stageId.getQueryId()).ifPresent(query -> query.cancelStage(stageId));
    }

    @Managed(description="Query scheduler executor")
    @Nested
    public ThreadPoolExecutorMBean getExecutor() {
        return this.queryExecutorMBean;
    }

    @Managed(description="Query query management executor")
    @Nested
    public ThreadPoolExecutorMBean getManagementExecutor() {
        return this.queryManagementExecutorMBean;
    }

    @Managed
    @Nested
    public QueryTracker<QueryExecution> getQueryTracker() {
        return this.queryTracker;
    }

    private void enforceMemoryLimits() {
        List runningQueries = (List)this.queryTracker.getAllQueries().stream().filter(query -> query.getState() == QueryState.RUNNING).collect(ImmutableList.toImmutableList());
        this.memoryManager.process(runningQueries, this::getQueries);
    }

    private void enforceCpuLimits() {
        for (QueryExecution query : this.queryTracker.getAllQueries()) {
            Duration cpuTime = query.getTotalCpuTime();
            Duration sessionLimit = SystemSessionProperties.getQueryMaxCpuTime(query.getSession());
            Duration limit = (Duration)Ordering.natural().min((Object)this.maxQueryCpuTime, (Object)sessionLimit);
            if (cpuTime.compareTo(limit) <= 0) continue;
            query.fail((Throwable)((Object)new ExceededCpuLimitException(limit)));
        }
    }

    private void enforceScanLimits() {
        for (QueryExecution query : this.queryTracker.getAllQueries()) {
            Optional<Object> limitOpt = SystemSessionProperties.getQueryMaxScanPhysicalBytes(query.getSession());
            if (this.maxQueryScanPhysicalBytes.isPresent()) {
                limitOpt = limitOpt.flatMap(sessionLimit -> this.maxQueryScanPhysicalBytes.map(serverLimit -> (DataSize)Ordering.natural().min(serverLimit, sessionLimit))).or(() -> this.maxQueryScanPhysicalBytes);
            }
            limitOpt.ifPresent(limit -> {
                DataSize scan = query.getBasicQueryInfo().getQueryStats().getPhysicalInputDataSize();
                if (scan.compareTo(limit) > 0) {
                    query.fail((Throwable)((Object)new ExceededScanLimitException((DataSize)limit)));
                }
            });
        }
    }
}

