/*
 * Decompiled with CFR 0.152.
 */
package io.trino.server.protocol;

import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.client.ProtocolHeaders;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.QueryManager;
import io.trino.operator.DirectExchangeClientSupplier;
import io.trino.server.DisconnectionAwareAsyncResponse;
import io.trino.server.ExternalUriInfo;
import io.trino.server.ForStatementResource;
import io.trino.server.ServerConfig;
import io.trino.server.protocol.PreparedStatementEncoder;
import io.trino.server.protocol.Query;
import io.trino.server.protocol.QueryInfoUrlFactory;
import io.trino.server.protocol.QueryResultsResponse;
import io.trino.server.protocol.Slug;
import io.trino.server.security.ResourceSecurity;
import io.trino.spi.QueryId;
import io.trino.spi.block.BlockEncodingSerde;
import jakarta.annotation.PreDestroy;
import jakarta.ws.rs.BeanParam;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.Response;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Path(value="/v1/statement/executing")
public class ExecutingStatementResource {
    private static final Logger log = Logger.get(ExecutingStatementResource.class);
    private static final Duration MAX_WAIT_TIME = new Duration(1.0, TimeUnit.SECONDS);
    private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
    private static final DataSize DEFAULT_TARGET_RESULT_SIZE = DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
    private static final DataSize MAX_TARGET_RESULT_SIZE = DataSize.of((long)128L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
    private final QueryManager queryManager;
    private final DirectExchangeClientSupplier directExchangeClientSupplier;
    private final ExchangeManagerRegistry exchangeManagerRegistry;
    private final BlockEncodingSerde blockEncodingSerde;
    private final QueryInfoUrlFactory queryInfoUrlFactory;
    private final BoundedExecutor responseExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap<QueryId, Query>();
    private final ScheduledExecutorService queryPurger = Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed((String)"execution-query-purger"));
    private final PreparedStatementEncoder preparedStatementEncoder;
    private final boolean compressionEnabled;

    @Inject
    public ExecutingStatementResource(QueryManager queryManager, DirectExchangeClientSupplier directExchangeClientSupplier, ExchangeManagerRegistry exchangeManagerRegistry, BlockEncodingSerde blockEncodingSerde, QueryInfoUrlFactory queryInfoUrlTemplate, @ForStatementResource BoundedExecutor responseExecutor, @ForStatementResource ScheduledExecutorService timeoutExecutor, PreparedStatementEncoder preparedStatementEncoder, ServerConfig serverConfig) {
        this.queryManager = Objects.requireNonNull(queryManager, "queryManager is null");
        this.directExchangeClientSupplier = Objects.requireNonNull(directExchangeClientSupplier, "directExchangeClientSupplier is null");
        this.exchangeManagerRegistry = Objects.requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
        this.blockEncodingSerde = Objects.requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
        this.queryInfoUrlFactory = Objects.requireNonNull(queryInfoUrlTemplate, "queryInfoUrlTemplate is null");
        this.responseExecutor = Objects.requireNonNull(responseExecutor, "responseExecutor is null");
        this.timeoutExecutor = Objects.requireNonNull(timeoutExecutor, "timeoutExecutor is null");
        this.preparedStatementEncoder = Objects.requireNonNull(preparedStatementEncoder, "preparedStatementEncoder is null");
        this.compressionEnabled = serverConfig.isQueryResultsCompressionEnabled();
        this.queryPurger.scheduleWithFixedDelay(() -> {
            try {
                for (QueryId queryId : this.queries.keySet()) {
                    Query query;
                    if (queryManager.hasQuery(queryId) || (query = (Query)this.queries.remove(queryId)) == null) continue;
                    query.dispose();
                }
            }
            catch (Throwable e) {
                log.warn(e, "Error removing old queries");
            }
            try {
                for (Query query : this.queries.values()) {
                    query.markResultsConsumedIfReady();
                }
            }
            catch (Throwable e) {
                log.warn(e, "Error marking results consumed");
            }
        }, 200L, 200L, TimeUnit.MILLISECONDS);
    }

    @PreDestroy
    public void stop() {
        this.queryPurger.shutdownNow();
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
    @GET
    @Path(value="{queryId}/{slug}/{token}")
    @Produces(value={"application/json"})
    public void getQueryResults(@PathParam(value="queryId") QueryId queryId, @PathParam(value="slug") String slug, @PathParam(value="token") long token, @QueryParam(value="maxWait") Duration maxWait, @QueryParam(value="targetResultSize") DataSize targetResultSize, @BeanParam ExternalUriInfo externalUriInfo, @Suspended @BeanParam DisconnectionAwareAsyncResponse asyncResponse) {
        Query query = this.getQuery(queryId, slug, token);
        this.asyncQueryResults(query, token, maxWait, targetResultSize, externalUriInfo, asyncResponse);
    }

    protected Query getQuery(QueryId queryId, String slug, long token) {
        Slug querySlug;
        Session session;
        Query query = (Query)this.queries.get(queryId);
        if (query != null) {
            if (!query.isSlugValid(slug, token)) {
                throw new NotFoundException("Query not found");
            }
            return query;
        }
        try {
            session = this.queryManager.getQuerySession(queryId);
            querySlug = this.queryManager.getQuerySlug(queryId);
            if (!querySlug.isValid(Slug.Context.EXECUTING_QUERY, slug, token)) {
                throw new NotFoundException("Query not found");
            }
        }
        catch (NoSuchElementException e) {
            throw new NotFoundException("Query not found");
        }
        query = this.queries.computeIfAbsent(queryId, id -> Query.create(session, querySlug, this.queryManager, this.queryInfoUrlFactory.getQueryInfoUrl(queryId), this.directExchangeClientSupplier, this.exchangeManagerRegistry, (Executor)this.responseExecutor, this.timeoutExecutor, this.blockEncodingSerde));
        return query;
    }

    private void asyncQueryResults(Query query, long token, Duration maxWait, DataSize targetResultSize, ExternalUriInfo externalUriInfo, DisconnectionAwareAsyncResponse asyncResponse) {
        Duration wait = (Duration)WAIT_ORDERING.min((Object)MAX_WAIT_TIME, (Object)maxWait);
        targetResultSize = targetResultSize == null ? DEFAULT_TARGET_RESULT_SIZE : (DataSize)Ordering.natural().min((Object)targetResultSize, (Object)MAX_TARGET_RESULT_SIZE);
        ListenableFuture<QueryResultsResponse> queryResultsFuture = query.waitForResults(token, externalUriInfo, wait, targetResultSize);
        ListenableFuture response = Futures.transform(queryResultsFuture, this::toResponse, (Executor)MoreExecutors.directExecutor());
        DisconnectionAwareAsyncResponse.bindDisconnectionAwareAsyncResponse(asyncResponse, response, (Executor)this.responseExecutor);
    }

    private Response toResponse(QueryResultsResponse resultsResponse) {
        Response.ResponseBuilder response = Response.ok((Object)resultsResponse.queryResults());
        ProtocolHeaders protocolHeaders = resultsResponse.protocolHeaders();
        resultsResponse.setCatalog().ifPresent(catalog -> response.header(protocolHeaders.responseSetCatalog(), catalog));
        resultsResponse.setSchema().ifPresent(schema -> response.header(protocolHeaders.responseSetSchema(), schema));
        resultsResponse.setPath().ifPresent(path -> response.header(protocolHeaders.responseSetPath(), path));
        resultsResponse.setAuthorizationUser().ifPresent(authorizationUser -> response.header(protocolHeaders.responseSetAuthorizationUser(), authorizationUser));
        if (resultsResponse.resetAuthorizationUser()) {
            response.header(protocolHeaders.responseResetAuthorizationUser(), (Object)true);
        }
        resultsResponse.setSessionProperties().forEach((key, value) -> response.header(protocolHeaders.responseSetSession(), (Object)(key + "=" + ExecutingStatementResource.urlEncode(value))));
        resultsResponse.resetSessionProperties().forEach(name -> response.header(protocolHeaders.responseClearSession(), name));
        resultsResponse.setRoles().forEach((key, value) -> response.header(protocolHeaders.responseSetRole(), (Object)(key + "=" + ExecutingStatementResource.urlEncode(value.toString()))));
        for (Map.Entry<String, String> entry : resultsResponse.addedPreparedStatements().entrySet()) {
            String encodedKey = ExecutingStatementResource.urlEncode(entry.getKey());
            String encodedValue = ExecutingStatementResource.urlEncode(this.preparedStatementEncoder.encodePreparedStatementForHeader(entry.getValue()));
            response.header(protocolHeaders.responseAddedPrepare(), (Object)(encodedKey + "=" + encodedValue));
        }
        for (String name2 : resultsResponse.deallocatedPreparedStatements()) {
            response.header(protocolHeaders.responseDeallocatedPrepare(), (Object)ExecutingStatementResource.urlEncode(name2));
        }
        resultsResponse.startedTransactionId().ifPresent(transactionId -> response.header(protocolHeaders.responseStartedTransactionId(), transactionId));
        if (resultsResponse.clearTransactionId()) {
            response.header(protocolHeaders.responseClearTransactionId(), (Object)true);
        }
        if (!this.compressionEnabled) {
            response.encoding("identity");
        }
        return response.build();
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
    @DELETE
    @Path(value="{queryId}/{slug}/{token}")
    @Produces(value={"application/json"})
    public Response cancelQuery(@PathParam(value="queryId") QueryId queryId, @PathParam(value="slug") String slug, @PathParam(value="token") long token) {
        Query query = (Query)this.queries.get(queryId);
        if (query != null) {
            if (!query.isSlugValid(slug, token)) {
                throw new NotFoundException("Query not found");
            }
            query.cancel();
            return Response.noContent().build();
        }
        try {
            if (!this.queryManager.getQuerySlug(queryId).isValid(Slug.Context.EXECUTING_QUERY, slug, token)) {
                throw new NotFoundException("Query not found");
            }
            this.queryManager.cancelQuery(queryId);
            return Response.noContent().build();
        }
        catch (NoSuchElementException e) {
            throw new NotFoundException("Query not found");
        }
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
    @DELETE
    @Path(value="partialCancel/{queryId}/{stage}/{slug}/{token}")
    public void partialCancel(@PathParam(value="queryId") QueryId queryId, @PathParam(value="stage") int stage, @PathParam(value="slug") String slug, @PathParam(value="token") long token) {
        Query query = this.getQuery(queryId, slug, token);
        query.partialCancel(stage);
    }

    private static String urlEncode(String value) {
        return URLEncoder.encode(value, StandardCharsets.UTF_8);
    }
}

