/*
 * Decompiled with CFR 0.152.
 */
package io.trino.dispatcher;

import com.clearspring.analytics.util.Preconditions;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.Threads;
import io.airlift.jaxrs.AsyncResponseHandler;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.client.QueryError;
import io.trino.client.QueryResults;
import io.trino.client.StatementStats;
import io.trino.dispatcher.CoordinatorLocation;
import io.trino.dispatcher.DispatchExecutor;
import io.trino.dispatcher.DispatchInfo;
import io.trino.dispatcher.DispatchManager;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.QueryState;
import io.trino.server.HttpRequestSessionContextFactory;
import io.trino.server.ProtocolConfig;
import io.trino.server.ServerConfig;
import io.trino.server.SessionContext;
import io.trino.server.protocol.QueryInfoUrlFactory;
import io.trino.server.protocol.Slug;
import io.trino.server.security.InternalPrincipal;
import io.trino.server.security.ResourceSecurity;
import io.trino.spi.ErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.security.Identity;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;

@Path(value="/v1/statement")
public class QueuedStatementResource {
    private static final Logger log = Logger.get(QueuedStatementResource.class);
    private static final Duration MAX_WAIT_TIME = new Duration(1.0, TimeUnit.SECONDS);
    private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
    private static final Duration NO_DURATION = new Duration(0.0, TimeUnit.MILLISECONDS);
    private final HttpRequestSessionContextFactory sessionContextFactory;
    private final DispatchManager dispatchManager;
    private final QueryInfoUrlFactory queryInfoUrlFactory;
    private final Executor responseExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final boolean compressionEnabled;
    private final Optional<String> alternateHeaderName;
    private final QueryManager queryManager;

    @Inject
    public QueuedStatementResource(HttpRequestSessionContextFactory sessionContextFactory, DispatchManager dispatchManager, DispatchExecutor executor, QueryInfoUrlFactory queryInfoUrlTemplate, ServerConfig serverConfig, ProtocolConfig protocolConfig, QueryManagerConfig queryManagerConfig) {
        this.sessionContextFactory = Objects.requireNonNull(sessionContextFactory, "sessionContextFactory is null");
        this.dispatchManager = Objects.requireNonNull(dispatchManager, "dispatchManager is null");
        this.responseExecutor = executor.getExecutor();
        this.timeoutExecutor = executor.getScheduledExecutor();
        this.queryInfoUrlFactory = Objects.requireNonNull(queryInfoUrlTemplate, "queryInfoUrlTemplate is null");
        this.compressionEnabled = serverConfig.isQueryResultsCompressionEnabled();
        this.alternateHeaderName = protocolConfig.getAlternateHeaderName();
        this.queryManager = new QueryManager(queryManagerConfig.getClientTimeout());
    }

    @PostConstruct
    public void start() {
        this.queryManager.initialize(this.dispatchManager);
    }

    @PreDestroy
    public void stop() {
        this.queryManager.destroy();
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.AUTHENTICATED_USER)
    @POST
    @Produces(value={"application/json"})
    public Response postStatement(String statement, @Context HttpServletRequest servletRequest, @Context HttpHeaders httpHeaders, @Context UriInfo uriInfo) {
        if (Strings.isNullOrEmpty((String)statement)) {
            throw QueuedStatementResource.badRequest(Response.Status.BAD_REQUEST, "SQL statement is empty");
        }
        Query query = this.registerQuery(statement, servletRequest, httpHeaders);
        return this.createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo));
    }

    private Query registerQuery(String statement, HttpServletRequest servletRequest, HttpHeaders httpHeaders) {
        Optional<String> remoteAddress = Optional.ofNullable(servletRequest.getRemoteAddr());
        Optional<Identity> identity = Optional.ofNullable((Identity)servletRequest.getAttribute("trino.authenticated-identity"));
        if (identity.flatMap(Identity::getPrincipal).map(InternalPrincipal.class::isInstance).orElse(false).booleanValue()) {
            throw QueuedStatementResource.badRequest(Response.Status.FORBIDDEN, "Internal communication can not be used to start a query");
        }
        MultivaluedMap headers = httpHeaders.getRequestHeaders();
        SessionContext sessionContext = this.sessionContextFactory.createSessionContext((MultivaluedMap<String, String>)headers, this.alternateHeaderName, remoteAddress, identity);
        Query query = new Query(statement, sessionContext, this.dispatchManager, this.queryInfoUrlFactory);
        this.queryManager.registerQuery(query);
        servletRequest.setAttribute("trino.authenticated-identity", null);
        return query;
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
    @GET
    @Path(value="queued/{queryId}/{slug}/{token}")
    @Produces(value={"application/json"})
    public void getStatus(@PathParam(value="queryId") QueryId queryId, @PathParam(value="slug") String slug, @PathParam(value="token") long token, @QueryParam(value="maxWait") Duration maxWait, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        Query query = this.getQuery(queryId, slug, token);
        ListenableFuture<Response> future = this.getStatus(query, token, maxWait, uriInfo);
        AsyncResponseHandler.bindAsyncResponse((AsyncResponse)asyncResponse, future, (Executor)this.responseExecutor);
    }

    private ListenableFuture<Response> getStatus(Query query, long token, Duration maxWait, UriInfo uriInfo) {
        long waitMillis = ((Duration)WAIT_ORDERING.min((Object)MAX_WAIT_TIME, (Object)maxWait)).toMillis();
        return FluentFuture.from(query.waitForDispatched()).withTimeout(waitMillis, TimeUnit.MILLISECONDS, this.timeoutExecutor).catching(TimeoutException.class, ignored -> null, MoreExecutors.directExecutor()).transform(ignored -> query.getQueryResults(token, uriInfo), this.responseExecutor).transform(this::createQueryResultsResponse, MoreExecutors.directExecutor());
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
    @DELETE
    @Path(value="queued/{queryId}/{slug}/{token}")
    @Produces(value={"application/json"})
    public Response cancelQuery(@PathParam(value="queryId") QueryId queryId, @PathParam(value="slug") String slug, @PathParam(value="token") long token) {
        this.getQuery(queryId, slug, token).cancel();
        return Response.noContent().build();
    }

    private Query getQuery(QueryId queryId, String slug, long token) {
        Query query = this.queryManager.getQuery(queryId);
        if (query == null || !query.getSlug().isValid(Slug.Context.QUEUED_QUERY, slug, token)) {
            throw QueuedStatementResource.badRequest(Response.Status.NOT_FOUND, "Query not found");
        }
        return query;
    }

    private Response createQueryResultsResponse(QueryResults results) {
        Response.ResponseBuilder builder = Response.ok((Object)results);
        if (!this.compressionEnabled) {
            builder.encoding("identity");
        }
        return builder.build();
    }

    private static URI getQueuedUri(QueryId queryId, Slug slug, long token, UriInfo uriInfo) {
        return uriInfo.getBaseUriBuilder().replacePath("/v1/statement/queued/").path(queryId.toString()).path(slug.makeSlug(Slug.Context.QUEUED_QUERY, token)).path(String.valueOf(token)).replaceQuery("").build(new Object[0]);
    }

    private static QueryResults createQueryResults(QueryId queryId, URI nextUri, Optional<QueryError> queryError, UriInfo uriInfo, Optional<URI> queryInfoUrl, Duration elapsedTime, Duration queuedTime) {
        QueryState state = queryError.map(error -> QueryState.FAILED).orElse(QueryState.QUEUED);
        return new QueryResults(queryId.toString(), QueryInfoUrlFactory.getQueryInfoUri(queryInfoUrl, queryId, uriInfo), null, nextUri, null, null, StatementStats.builder().setState(state.toString()).setQueued(state == QueryState.QUEUED).setElapsedTimeMillis(elapsedTime.toMillis()).setQueuedTimeMillis(queuedTime.toMillis()).build(), (QueryError)queryError.orElse(null), (List)ImmutableList.of(), null, null);
    }

    private static WebApplicationException badRequest(Response.Status status, String message) {
        throw new WebApplicationException(Response.status((Response.Status)status).type(MediaType.TEXT_PLAIN_TYPE).entity((Object)message).build());
    }

    @ThreadSafe
    private static class QueryManager {
        private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap<QueryId, Query>();
        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed((String)"drain-state-query-manager"));
        private final Duration querySubmissionTimeout;

        public QueryManager(Duration querySubmissionTimeout) {
            this.querySubmissionTimeout = Objects.requireNonNull(querySubmissionTimeout, "querySubmissionTimeout is null");
        }

        public void initialize(DispatchManager dispatchManager) {
            this.scheduledExecutorService.scheduleWithFixedDelay(() -> this.syncWith(dispatchManager), 200L, 200L, TimeUnit.MILLISECONDS);
        }

        public void destroy() {
            this.scheduledExecutorService.shutdownNow();
        }

        private void syncWith(DispatchManager dispatchManager) {
            this.queries.forEach((queryId, query) -> {
                if (this.shouldBePurged(dispatchManager, (Query)query)) {
                    this.removeQuery((QueryId)queryId);
                }
            });
        }

        private boolean shouldBePurged(DispatchManager dispatchManager, Query query) {
            if (query.isSubmissionAbandoned()) {
                return true;
            }
            if (query.tryAbandonSubmissionWithTimeout(this.querySubmissionTimeout)) {
                return true;
            }
            return query.isCreated() && !dispatchManager.isQueryRegistered(query.getQueryId());
        }

        private void removeQuery(QueryId queryId) {
            Optional.ofNullable((Query)this.queries.remove(queryId)).ifPresent(QueryManager::destroyQuietly);
        }

        private static void destroyQuietly(Query query) {
            try {
                query.destroy();
            }
            catch (Throwable t) {
                log.error(t, "Error destroying query");
            }
        }

        public void registerQuery(Query query) {
            Query existingQuery = this.queries.putIfAbsent(query.getQueryId(), query);
            Preconditions.checkState((existingQuery == null ? 1 : 0) != 0, (String)"Query already registered");
        }

        @Nullable
        public Query getQuery(QueryId queryId) {
            return (Query)this.queries.get(queryId);
        }
    }

    private static final class Query {
        private final String query;
        private final SessionContext sessionContext;
        private final DispatchManager dispatchManager;
        private final QueryId queryId;
        private final Optional<URI> queryInfoUrl;
        private final Slug slug = Slug.createNew();
        private final AtomicLong lastToken = new AtomicLong();
        private final long initTime = System.nanoTime();
        private final AtomicReference<Boolean> submissionGate = new AtomicReference();
        private final SettableFuture<Void> creationFuture = SettableFuture.create();

        public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, QueryInfoUrlFactory queryInfoUrlFactory) {
            this.query = Objects.requireNonNull(query, "query is null");
            this.sessionContext = Objects.requireNonNull(sessionContext, "sessionContext is null");
            this.dispatchManager = Objects.requireNonNull(dispatchManager, "dispatchManager is null");
            this.queryId = dispatchManager.createQueryId();
            Objects.requireNonNull(queryInfoUrlFactory, "queryInfoUrlFactory is null");
            this.queryInfoUrl = queryInfoUrlFactory.getQueryInfoUrl(this.queryId);
        }

        public QueryId getQueryId() {
            return this.queryId;
        }

        public Slug getSlug() {
            return this.slug;
        }

        public long getLastToken() {
            return this.lastToken.get();
        }

        public boolean tryAbandonSubmissionWithTimeout(Duration querySubmissionTimeout) {
            return Duration.nanosSince((long)this.initTime).compareTo(querySubmissionTimeout) >= 0 && this.submissionGate.compareAndSet(null, false);
        }

        public boolean isSubmissionAbandoned() {
            return Boolean.FALSE.equals(this.submissionGate.get());
        }

        public boolean isCreated() {
            return this.creationFuture.isDone();
        }

        private ListenableFuture<Void> waitForDispatched() {
            this.submitIfNeeded();
            if (!this.creationFuture.isDone()) {
                return Futures.nonCancellationPropagating(this.creationFuture);
            }
            return this.dispatchManager.waitForDispatched(this.queryId);
        }

        private void submitIfNeeded() {
            if (this.submissionGate.compareAndSet(null, true)) {
                this.creationFuture.setFuture(this.dispatchManager.createQuery(this.queryId, this.slug, this.sessionContext, this.query));
            }
        }

        public QueryResults getQueryResults(long token, UriInfo uriInfo) {
            long lastToken = this.lastToken.get();
            if (token != lastToken && token != lastToken + 1L) {
                throw new WebApplicationException(Response.Status.GONE);
            }
            this.lastToken.compareAndSet(lastToken, token);
            if (!this.creationFuture.isDone()) {
                return this.createQueryResults(token + 1L, uriInfo, DispatchInfo.queued(NO_DURATION, NO_DURATION));
            }
            DispatchInfo dispatchInfo = this.dispatchManager.getDispatchInfo(this.queryId).orElseThrow(() -> new WebApplicationException(Response.status((Response.Status)Response.Status.NOT_FOUND).build()));
            return this.createQueryResults(token + 1L, uriInfo, dispatchInfo);
        }

        public void cancel() {
            this.creationFuture.addListener(() -> this.dispatchManager.cancelQuery(this.queryId), MoreExecutors.directExecutor());
        }

        public void destroy() {
            this.sessionContext.getIdentity().destroy();
        }

        private QueryResults createQueryResults(long token, UriInfo uriInfo, DispatchInfo dispatchInfo) {
            URI nextUri = this.getNextUri(token, uriInfo, dispatchInfo);
            Optional<QueryError> queryError = dispatchInfo.getFailureInfo().map(this::toQueryError);
            return QueuedStatementResource.createQueryResults(this.queryId, nextUri, queryError, uriInfo, this.queryInfoUrl, dispatchInfo.getElapsedTime(), dispatchInfo.getQueuedTime());
        }

        private URI getNextUri(long token, UriInfo uriInfo, DispatchInfo dispatchInfo) {
            if (dispatchInfo.getFailureInfo().isPresent()) {
                return null;
            }
            return dispatchInfo.getCoordinatorLocation().map(coordinatorLocation -> this.getRedirectUri((CoordinatorLocation)coordinatorLocation, uriInfo)).orElseGet(() -> QueuedStatementResource.getQueuedUri(this.queryId, this.slug, token, uriInfo));
        }

        private URI getRedirectUri(CoordinatorLocation coordinatorLocation, UriInfo uriInfo) {
            URI coordinatorUri = coordinatorLocation.getUri(uriInfo);
            return UriBuilder.fromUri((URI)coordinatorUri).replacePath("/v1/statement/executing").path(this.queryId.toString()).path(this.slug.makeSlug(Slug.Context.EXECUTING_QUERY, 0L)).path("0").build(new Object[0]);
        }

        private QueryError toQueryError(ExecutionFailureInfo executionFailureInfo) {
            ErrorCode errorCode;
            if (executionFailureInfo.getErrorCode() != null) {
                errorCode = executionFailureInfo.getErrorCode();
            } else {
                errorCode = StandardErrorCode.GENERIC_INTERNAL_ERROR.toErrorCode();
                log.warn("Failed query %s has no error code", new Object[]{this.queryId});
            }
            return new QueryError((String)MoreObjects.firstNonNull((Object)executionFailureInfo.getMessage(), (Object)"Internal error"), null, errorCode.getCode(), errorCode.getName(), errorCode.getType().toString(), executionFailureInfo.getErrorLocation(), executionFailureInfo.toFailureInfo());
        }
    }
}

