/*
 * Decompiled with CFR 0.152.
 */
package io.trino.dispatcher;

import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.jaxrs.AsyncResponseHandler;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.client.QueryError;
import io.trino.client.QueryResults;
import io.trino.client.StatementStats;
import io.trino.dispatcher.CoordinatorLocation;
import io.trino.dispatcher.DispatchExecutor;
import io.trino.dispatcher.DispatchInfo;
import io.trino.dispatcher.DispatchManager;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.QueryState;
import io.trino.server.HttpRequestSessionContext;
import io.trino.server.ProtocolConfig;
import io.trino.server.ServerConfig;
import io.trino.server.SessionContext;
import io.trino.server.protocol.Slug;
import io.trino.server.security.ResourceSecurity;
import io.trino.spi.ErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.security.GroupProvider;
import io.trino.spi.security.Identity;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;

@Path(value="/v1/statement")
public class QueuedStatementResource {
    private static final Logger log = Logger.get(QueuedStatementResource.class);
    private static final Duration MAX_WAIT_TIME = new Duration(1.0, TimeUnit.SECONDS);
    private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
    private static final Duration NO_DURATION = new Duration(0.0, TimeUnit.MILLISECONDS);
    private final GroupProvider groupProvider;
    private final DispatchManager dispatchManager;
    private final Executor responseExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap<QueryId, Query>();
    private final ScheduledExecutorService queryPurger = Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed((String)"dispatch-query-purger"));
    private final boolean compressionEnabled;
    private final Optional<String> alternateHeaderName;

    @Inject
    public QueuedStatementResource(GroupProvider groupProvider, DispatchManager dispatchManager, DispatchExecutor executor, ServerConfig serverConfig, ProtocolConfig protocolConfig) {
        this.groupProvider = Objects.requireNonNull(groupProvider, "groupProvider is null");
        this.dispatchManager = Objects.requireNonNull(dispatchManager, "dispatchManager is null");
        Objects.requireNonNull(dispatchManager, "dispatchManager is null");
        this.responseExecutor = Objects.requireNonNull(executor, "responseExecutor is null").getExecutor();
        this.timeoutExecutor = Objects.requireNonNull(executor, "timeoutExecutor is null").getScheduledExecutor();
        this.compressionEnabled = Objects.requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();
        this.alternateHeaderName = Objects.requireNonNull(protocolConfig, "protocolConfig is null").getAlternateHeaderName();
        this.queryPurger.scheduleWithFixedDelay(() -> {
            try {
                for (Map.Entry entry : ImmutableSet.copyOf(this.queries.entrySet())) {
                    Query query;
                    if (!((Query)entry.getValue()).isSubmissionFinished() || dispatchManager.isQueryRegistered((QueryId)entry.getKey()) || (query = (Query)this.queries.remove(entry.getKey())) == null) continue;
                    try {
                        query.destroy();
                    }
                    catch (Throwable e) {
                        log.warn(e, "Error destroying identity");
                    }
                }
            }
            catch (Throwable e) {
                log.warn(e, "Error removing old queries");
            }
        }, 200L, 200L, TimeUnit.MILLISECONDS);
    }

    @PreDestroy
    public void stop() {
        this.queryPurger.shutdownNow();
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.AUTHENTICATED_USER)
    @POST
    @Produces(value={"application/json"})
    public Response postStatement(String statement, @Context HttpServletRequest servletRequest, @Context HttpHeaders httpHeaders, @Context UriInfo uriInfo) {
        if (Strings.isNullOrEmpty((String)statement)) {
            throw QueuedStatementResource.badRequest(Response.Status.BAD_REQUEST, "SQL statement is empty");
        }
        String remoteAddress = servletRequest.getRemoteAddr();
        Optional<Identity> identity = Optional.ofNullable((Identity)servletRequest.getAttribute("trino.authenticated-identity"));
        MultivaluedMap headers = httpHeaders.getRequestHeaders();
        HttpRequestSessionContext sessionContext = new HttpRequestSessionContext((MultivaluedMap<String, String>)headers, this.alternateHeaderName, remoteAddress, identity, this.groupProvider);
        Query query = new Query(statement, sessionContext, this.dispatchManager);
        this.queries.put(query.getQueryId(), query);
        servletRequest.setAttribute("trino.authenticated-identity", null);
        return QueuedStatementResource.createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo), this.compressionEnabled);
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
    @GET
    @Path(value="queued/{queryId}/{slug}/{token}")
    @Produces(value={"application/json"})
    public void getStatus(@PathParam(value="queryId") QueryId queryId, @PathParam(value="slug") String slug, @PathParam(value="token") long token, @QueryParam(value="maxWait") Duration maxWait, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        Query query = this.getQuery(queryId, slug, token);
        ListenableFuture futureStateChange = MoreFutures.addTimeout(query.waitForDispatched(), () -> null, (Duration)((Duration)WAIT_ORDERING.min((Object)MAX_WAIT_TIME, (Object)maxWait)), (ScheduledExecutorService)this.timeoutExecutor);
        ListenableFuture queryResultsFuture = Futures.transform((ListenableFuture)futureStateChange, ignored -> query.getQueryResults(token, uriInfo), (Executor)this.responseExecutor);
        ListenableFuture response = Futures.transform((ListenableFuture)queryResultsFuture, queryResults -> QueuedStatementResource.createQueryResultsResponse(queryResults, this.compressionEnabled), (Executor)MoreExecutors.directExecutor());
        AsyncResponseHandler.bindAsyncResponse((AsyncResponse)asyncResponse, (ListenableFuture)response, (Executor)this.responseExecutor);
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
    @DELETE
    @Path(value="queued/{queryId}/{slug}/{token}")
    @Produces(value={"application/json"})
    public Response cancelQuery(@PathParam(value="queryId") QueryId queryId, @PathParam(value="slug") String slug, @PathParam(value="token") long token) {
        this.getQuery(queryId, slug, token).cancel();
        return Response.noContent().build();
    }

    private Query getQuery(QueryId queryId, String slug, long token) {
        Query query = (Query)this.queries.get(queryId);
        if (query == null || !query.getSlug().isValid(Slug.Context.QUEUED_QUERY, slug, token)) {
            throw QueuedStatementResource.badRequest(Response.Status.NOT_FOUND, "Query not found");
        }
        return query;
    }

    private static Response createQueryResultsResponse(QueryResults results, boolean compressionEnabled) {
        Response.ResponseBuilder builder = Response.ok((Object)results);
        if (!compressionEnabled) {
            builder.encoding("identity");
        }
        return builder.build();
    }

    private static URI getQueryHtmlUri(QueryId queryId, UriInfo uriInfo) {
        return uriInfo.getRequestUriBuilder().replacePath("ui/query.html").replaceQuery(queryId.toString()).build(new Object[0]);
    }

    private static URI getQueuedUri(QueryId queryId, Slug slug, long token, UriInfo uriInfo) {
        return uriInfo.getBaseUriBuilder().replacePath("/v1/statement/queued/").path(queryId.toString()).path(slug.makeSlug(Slug.Context.QUEUED_QUERY, token)).path(String.valueOf(token)).replaceQuery("").build(new Object[0]);
    }

    private static QueryResults createQueryResults(QueryId queryId, URI nextUri, Optional<QueryError> queryError, UriInfo uriInfo, Duration elapsedTime, Duration queuedTime) {
        QueryState state = queryError.map(error -> QueryState.FAILED).orElse(QueryState.QUEUED);
        return new QueryResults(queryId.toString(), QueuedStatementResource.getQueryHtmlUri(queryId, uriInfo), null, nextUri, null, null, StatementStats.builder().setState(state.toString()).setQueued(state == QueryState.QUEUED).setElapsedTimeMillis(elapsedTime.toMillis()).setQueuedTimeMillis(queuedTime.toMillis()).build(), (QueryError)queryError.orElse(null), (List)ImmutableList.of(), null, null);
    }

    private static WebApplicationException badRequest(Response.Status status, String message) {
        throw new WebApplicationException(Response.status((Response.Status)status).type(MediaType.TEXT_PLAIN_TYPE).entity((Object)message).build());
    }

    private static final class Query {
        private final String query;
        private final SessionContext sessionContext;
        private final DispatchManager dispatchManager;
        private final QueryId queryId;
        private final Slug slug = Slug.createNew();
        private final AtomicLong lastToken = new AtomicLong();
        @GuardedBy(value="this")
        private ListenableFuture<?> querySubmissionFuture;

        public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager) {
            this.query = Objects.requireNonNull(query, "query is null");
            this.sessionContext = Objects.requireNonNull(sessionContext, "sessionContext is null");
            this.dispatchManager = Objects.requireNonNull(dispatchManager, "dispatchManager is null");
            this.queryId = dispatchManager.createQueryId();
        }

        public QueryId getQueryId() {
            return this.queryId;
        }

        public Slug getSlug() {
            return this.slug;
        }

        public long getLastToken() {
            return this.lastToken.get();
        }

        public synchronized boolean isSubmissionFinished() {
            return this.querySubmissionFuture != null && this.querySubmissionFuture.isDone();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private ListenableFuture<?> waitForDispatched() {
            Query query = this;
            synchronized (query) {
                if (this.querySubmissionFuture == null) {
                    this.querySubmissionFuture = this.dispatchManager.createQuery(this.queryId, this.slug, this.sessionContext, this.query);
                }
                if (!this.querySubmissionFuture.isDone()) {
                    return this.querySubmissionFuture;
                }
            }
            return this.dispatchManager.waitForDispatched(this.queryId);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public QueryResults getQueryResults(long token, UriInfo uriInfo) {
            long lastToken = this.lastToken.get();
            if (token != lastToken && token != lastToken + 1L) {
                throw new WebApplicationException(Response.Status.GONE);
            }
            this.lastToken.compareAndSet(lastToken, token);
            Query query = this;
            synchronized (query) {
                if (this.querySubmissionFuture == null || !this.querySubmissionFuture.isDone()) {
                    return this.createQueryResults(token + 1L, uriInfo, DispatchInfo.queued(NO_DURATION, NO_DURATION));
                }
            }
            Optional<DispatchInfo> dispatchInfo = this.dispatchManager.getDispatchInfo(this.queryId);
            if (dispatchInfo.isEmpty()) {
                throw new WebApplicationException(Response.status((Response.Status)Response.Status.NOT_FOUND).build());
            }
            return this.createQueryResults(token + 1L, uriInfo, dispatchInfo.get());
        }

        public synchronized void cancel() {
            this.querySubmissionFuture.addListener(() -> this.dispatchManager.cancelQuery(this.queryId), MoreExecutors.directExecutor());
        }

        public void destroy() {
            this.sessionContext.getIdentity().destroy();
        }

        private QueryResults createQueryResults(long token, UriInfo uriInfo, DispatchInfo dispatchInfo) {
            URI nextUri = this.getNextUri(token, uriInfo, dispatchInfo);
            Optional<QueryError> queryError = dispatchInfo.getFailureInfo().map(this::toQueryError);
            return QueuedStatementResource.createQueryResults(this.queryId, nextUri, queryError, uriInfo, dispatchInfo.getElapsedTime(), dispatchInfo.getQueuedTime());
        }

        private URI getNextUri(long token, UriInfo uriInfo, DispatchInfo dispatchInfo) {
            if (dispatchInfo.getFailureInfo().isPresent()) {
                return null;
            }
            return dispatchInfo.getCoordinatorLocation().map(coordinatorLocation -> this.getRedirectUri((CoordinatorLocation)coordinatorLocation, uriInfo)).orElseGet(() -> QueuedStatementResource.getQueuedUri(this.queryId, this.slug, token, uriInfo));
        }

        private URI getRedirectUri(CoordinatorLocation coordinatorLocation, UriInfo uriInfo) {
            URI coordinatorUri = coordinatorLocation.getUri(uriInfo);
            return UriBuilder.fromUri((URI)coordinatorUri).replacePath("/v1/statement/executing").path(this.queryId.toString()).path(this.slug.makeSlug(Slug.Context.EXECUTING_QUERY, 0L)).path("0").build(new Object[0]);
        }

        private QueryError toQueryError(ExecutionFailureInfo executionFailureInfo) {
            ErrorCode errorCode;
            if (executionFailureInfo.getErrorCode() != null) {
                errorCode = executionFailureInfo.getErrorCode();
            } else {
                errorCode = StandardErrorCode.GENERIC_INTERNAL_ERROR.toErrorCode();
                log.warn("Failed query %s has no error code", new Object[]{this.queryId});
            }
            return new QueryError((String)MoreObjects.firstNonNull((Object)executionFailureInfo.getMessage(), (Object)"Internal error"), null, errorCode.getCode(), errorCode.getName(), errorCode.getType().toString(), executionFailureInfo.getErrorLocation(), executionFailureInfo.toFailureInfo());
        }
    }
}

