/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryLimit;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.resourcemanager.ClusterQueryTrackerService;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;

@ThreadSafe
public class QueryTracker<T extends TrackedQuery> {
    private static final Logger log = Logger.get(QueryTracker.class);
    private final int maxQueryHistory;
    private final int maxTotalRunningTaskCountToKillQuery;
    private final int maxQueryRunningTaskCount;
    private final AtomicInteger runningTaskCount = new AtomicInteger();
    private final AtomicLong queriesKilledDueToTooManyTask = new AtomicLong();
    private final Duration minQueryExpireAge;
    private final ConcurrentMap<QueryId, T> queries = new ConcurrentHashMap<QueryId, T>();
    private final Queue<T> expirationQueue = new LinkedBlockingQueue<T>();
    private final Duration clientTimeout;
    private final ScheduledExecutorService queryManagementExecutor;
    @GuardedBy(value="this")
    private ScheduledFuture<?> backgroundTask;
    private final Optional<ClusterQueryTrackerService> clusterQueryTrackerService;

    public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorService queryManagementExecutor, Optional<ClusterQueryTrackerService> clusterQueryTrackerService) {
        Objects.requireNonNull(queryManagerConfig, "queryManagerConfig is null");
        this.minQueryExpireAge = queryManagerConfig.getMinQueryExpireAge();
        this.maxQueryHistory = queryManagerConfig.getMaxQueryHistory();
        this.clientTimeout = queryManagerConfig.getClientTimeout();
        this.maxTotalRunningTaskCountToKillQuery = queryManagerConfig.getMaxTotalRunningTaskCountToKillQuery();
        this.maxQueryRunningTaskCount = queryManagerConfig.getMaxQueryRunningTaskCount();
        this.queryManagementExecutor = Objects.requireNonNull(queryManagementExecutor, "queryManagementExecutor is null");
        this.clusterQueryTrackerService = clusterQueryTrackerService;
    }

    public synchronized void start() {
        Preconditions.checkState((this.backgroundTask == null ? 1 : 0) != 0, (Object)"QueryTracker already started");
        this.backgroundTask = this.queryManagementExecutor.scheduleWithFixedDelay(() -> {
            try {
                this.failAbandonedQueries();
            }
            catch (Throwable e) {
                log.error(e, "Error cancelling abandoned queries");
            }
            try {
                this.enforceTimeLimits();
            }
            catch (Throwable e) {
                log.error(e, "Error enforcing query timeout limits");
            }
            try {
                if (this.maxTotalRunningTaskCountToKillQuery != Integer.MAX_VALUE && this.maxQueryRunningTaskCount != Integer.MAX_VALUE) {
                    this.enforceTaskLimits();
                }
            }
            catch (Throwable e) {
                log.error(e, "Error enforcing running task limits");
            }
            try {
                this.removeExpiredQueries();
            }
            catch (Throwable e) {
                log.error(e, "Error removing expired queries");
            }
            try {
                this.pruneExpiredQueries();
            }
            catch (Throwable e) {
                log.error(e, "Error pruning expired queries");
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        QueryTracker queryTracker = this;
        synchronized (queryTracker) {
            if (this.backgroundTask != null) {
                this.backgroundTask.cancel(true);
            }
        }
        boolean queryCancelled = false;
        for (TrackedQuery trackedQuery : this.queries.values()) {
            if (trackedQuery.isDone()) continue;
            log.info("Server shutting down. Query %s has been cancelled", new Object[]{trackedQuery.getQueryId()});
            trackedQuery.fail(new PrestoException((ErrorCodeSupplier)StandardErrorCode.SERVER_SHUTTING_DOWN, "Server is shutting down. Query " + trackedQuery.getQueryId() + " has been cancelled"));
            queryCancelled = true;
        }
        if (queryCancelled) {
            try {
                TimeUnit.SECONDS.sleep(5L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public Collection<T> getAllQueries() {
        return ImmutableList.copyOf(this.queries.values());
    }

    public T getQuery(QueryId queryId) throws NoSuchElementException {
        return (T)((TrackedQuery)this.tryGetQuery(queryId).orElseThrow(() -> new NoSuchElementException(queryId.toString())));
    }

    public Optional<T> tryGetQuery(QueryId queryId) {
        Objects.requireNonNull(queryId, "queryId is null");
        return Optional.ofNullable(this.queries.get(queryId));
    }

    public boolean addQuery(T execution) {
        return this.queries.putIfAbsent(execution.getQueryId(), execution) == null;
    }

    public void expireQuery(QueryId queryId) {
        this.tryGetQuery(queryId).ifPresent(query -> {
            query.pruneFinishedQueryInfo();
            this.expirationQueue.add(query);
        });
    }

    public long getRunningTaskCount() {
        return this.runningTaskCount.get();
    }

    public long getQueriesKilledDueToTooManyTask() {
        return this.queriesKilledDueToTooManyTask.get();
    }

    private void enforceTimeLimits() {
        for (TrackedQuery query : this.queries.values()) {
            if (query.isDone()) continue;
            Duration queryMaxRunTime = SystemSessionProperties.getQueryMaxRunTime(query.getSession());
            QueryLimit<Duration> queryMaxExecutionTime = QueryLimit.getMinimum(QueryLimit.createDurationLimit(SystemSessionProperties.getQueryMaxExecutionTime(query.getSession()), QueryLimit.Source.QUERY), query.getResourceGroupQueryLimits().flatMap(ResourceGroupQueryLimits::getExecutionTimeLimit).map(rgLimit -> QueryLimit.createDurationLimit(rgLimit, QueryLimit.Source.RESOURCE_GROUP)).orElse(null));
            Optional<DateTime> executionStartTime = query.getExecutionStartTime();
            DateTime createTime = query.getCreateTime();
            if (executionStartTime.isPresent() && executionStartTime.get().plus(queryMaxExecutionTime.getLimit().toMillis()).isBeforeNow()) {
                query.fail(new PrestoException((ErrorCodeSupplier)StandardErrorCode.EXCEEDED_TIME_LIMIT, String.format("Query exceeded the maximum execution time limit of %s defined at the %s level", queryMaxExecutionTime.getLimit(), queryMaxExecutionTime.getLimitSource().name())));
            }
            if (!createTime.plus(queryMaxRunTime.toMillis()).isBeforeNow()) continue;
            query.fail(new PrestoException((ErrorCodeSupplier)StandardErrorCode.EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit of " + queryMaxRunTime));
        }
    }

    @VisibleForTesting
    void enforceTaskLimits() {
        QueryAndTaskCount queryAndTaskCount;
        int totalRunningTaskCount = 0;
        PriorityQueue<QueryAndTaskCount> taskCountQueue = new PriorityQueue<QueryAndTaskCount>(Comparator.comparingInt(queryAndCount -> -1 * queryAndCount.getTaskCount()));
        for (TrackedQuery query : this.queries.values()) {
            if (query.isDone() || !(query instanceof QueryExecution)) continue;
            int runningTaskCount = ((QueryExecution)query).getRunningTaskCount();
            totalRunningTaskCount += runningTaskCount;
            if (runningTaskCount <= this.maxQueryRunningTaskCount) continue;
            taskCountQueue.add(new QueryAndTaskCount<TrackedQuery>(query, runningTaskCount));
        }
        if (this.clusterQueryTrackerService.isPresent()) {
            totalRunningTaskCount = this.clusterQueryTrackerService.get().getRunningTaskCount();
        }
        this.runningTaskCount.set(totalRunningTaskCount);
        for (int runningTaskCountAfterKills = totalRunningTaskCount; runningTaskCountAfterKills > this.maxTotalRunningTaskCountToKillQuery && !taskCountQueue.isEmpty(); runningTaskCountAfterKills -= queryAndTaskCount.getTaskCount()) {
            queryAndTaskCount = (QueryAndTaskCount)taskCountQueue.poll();
            queryAndTaskCount.getQuery().fail(new PrestoException((ErrorCodeSupplier)StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES, String.format("Query killed because the cluster is overloaded with too many tasks (%s) and this query was running with the highest number of tasks (%s). %s Otherwise, please try again later.", totalRunningTaskCount, queryAndTaskCount.getTaskCount(), "If the query contains multiple DISTINCTs, please set the 'use_mark_distinct' session property to false. If the query contains multiple CTEs that are referenced more than once, please create temporary table(s) for one or more of the CTEs.")));
            this.queriesKilledDueToTooManyTask.incrementAndGet();
        }
    }

    private void pruneExpiredQueries() {
        if (this.expirationQueue.size() <= this.maxQueryHistory) {
            return;
        }
        int count = 0;
        for (TrackedQuery query : this.expirationQueue) {
            if (this.expirationQueue.size() - count <= this.maxQueryHistory) break;
            query.pruneExpiredQueryInfo();
            ++count;
        }
    }

    private void removeExpiredQueries() {
        DateTime timeHorizon = DateTime.now().minus(this.minQueryExpireAge.toMillis());
        while (this.expirationQueue.size() > this.maxQueryHistory) {
            TrackedQuery query = (TrackedQuery)this.expirationQueue.peek();
            if (query == null) {
                return;
            }
            Optional<DateTime> endTime = query.getEndTime();
            if (!endTime.isPresent()) continue;
            if (endTime.get().isAfter((ReadableInstant)timeHorizon)) {
                return;
            }
            QueryId queryId = query.getQueryId();
            log.debug("Remove query %s", new Object[]{queryId});
            this.queries.remove(queryId);
            this.expirationQueue.remove(query);
        }
    }

    private void failAbandonedQueries() {
        for (TrackedQuery query : this.queries.values()) {
            try {
                if (query.isDone() || !this.isAbandoned(query)) continue;
                log.info("Failing abandoned query %s", new Object[]{query.getQueryId()});
                query.fail(new PrestoException((ErrorCodeSupplier)StandardErrorCode.ABANDONED_QUERY, String.format("Query %s has not been accessed since %s: currentTime %s", query.getQueryId(), query.getLastHeartbeat(), DateTime.now())));
            }
            catch (RuntimeException e) {
                log.error((Throwable)e, "Exception failing abandoned query %s", new Object[]{query.getQueryId()});
            }
        }
    }

    private boolean isAbandoned(T query) {
        DateTime oldestAllowedHeartbeat = DateTime.now().minus(this.clientTimeout.toMillis());
        DateTime lastHeartbeat = query.getLastHeartbeat();
        return lastHeartbeat != null && lastHeartbeat.isBefore((ReadableInstant)oldestAllowedHeartbeat);
    }

    public static interface TrackedQuery {
        public QueryId getQueryId();

        public boolean isDone();

        public Session getSession();

        public DateTime getCreateTime();

        public Optional<DateTime> getExecutionStartTime();

        public DateTime getLastHeartbeat();

        public Optional<DateTime> getEndTime();

        public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits();

        public void fail(Throwable var1);

        public void pruneExpiredQueryInfo();

        public void pruneFinishedQueryInfo();
    }

    private static class QueryAndTaskCount<T extends TrackedQuery> {
        private final T query;
        private final int taskCount;

        public QueryAndTaskCount(T query, int taskCount) {
            this.query = query;
            this.taskCount = taskCount;
        }

        public T getQuery() {
            return this.query;
        }

        public int getTaskCount() {
            return this.taskCount;
        }
    }
}

