/*
 * Decompiled with CFR 0.152.
 */
package io.trino.server;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.concurrent.MoreFutures;
import io.airlift.jaxrs.AsyncResponseHandler;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.stats.TimeStat;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.execution.FailureInjector;
import io.trino.execution.SqlTaskManager;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.buffer.BufferResult;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.metadata.SessionPropertyManager;
import io.trino.server.FailTaskRequest;
import io.trino.server.ForAsyncHttp;
import io.trino.server.TaskUpdateRequest;
import io.trino.server.security.ResourceSecurity;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

@Path(value="/v1/task")
public class TaskResource {
    private static final Logger log = Logger.get(TaskResource.class);
    private static final Duration ADDITIONAL_WAIT_TIME = new Duration(5.0, TimeUnit.SECONDS);
    private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(2.0, TimeUnit.SECONDS);
    private final SqlTaskManager taskManager;
    private final SessionPropertyManager sessionPropertyManager;
    private final Executor responseExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final FailureInjector failureInjector;
    private final TimeStat readFromOutputBufferTime = new TimeStat();
    private final TimeStat resultsRequestTime = new TimeStat();

    @Inject
    public TaskResource(SqlTaskManager taskManager, SessionPropertyManager sessionPropertyManager, @ForAsyncHttp BoundedExecutor responseExecutor, @ForAsyncHttp ScheduledExecutorService timeoutExecutor, FailureInjector failureInjector) {
        this.taskManager = Objects.requireNonNull(taskManager, "taskManager is null");
        this.sessionPropertyManager = Objects.requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
        this.responseExecutor = (Executor)Objects.requireNonNull(responseExecutor, "responseExecutor is null");
        this.timeoutExecutor = Objects.requireNonNull(timeoutExecutor, "timeoutExecutor is null");
        this.failureInjector = Objects.requireNonNull(failureInjector, "failureInjector is null");
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @GET
    @Produces(value={"application/json"})
    public List<TaskInfo> getAllTaskInfo(@Context UriInfo uriInfo) {
        ImmutableList allTaskInfo = this.taskManager.getAllTaskInfo();
        if (TaskResource.shouldSummarize(uriInfo)) {
            allTaskInfo = ImmutableList.copyOf((Iterable)Iterables.transform(allTaskInfo, TaskInfo::summarize));
        }
        return allTaskInfo;
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @POST
    @Path(value="{taskId}")
    @Consumes(value={"application/json"})
    @Produces(value={"application/json"})
    public void createOrUpdateTask(@PathParam(value="taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        Objects.requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");
        Session session = taskUpdateRequest.getSession().toSession(this.sessionPropertyManager, taskUpdateRequest.getExtraCredentials());
        if (this.injectFailure(session.getTraceToken(), taskId, RequestType.CREATE_OR_UPDATE_TASK, asyncResponse)) {
            return;
        }
        TaskInfo taskInfo = this.taskManager.updateTask(session, taskId, taskUpdateRequest.getFragment(), taskUpdateRequest.getSplitAssignments(), taskUpdateRequest.getOutputIds(), taskUpdateRequest.getDynamicFilterDomains());
        if (TaskResource.shouldSummarize(uriInfo)) {
            taskInfo = taskInfo.summarize();
        }
        asyncResponse.resume((Object)Response.ok().entity((Object)taskInfo).build());
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @GET
    @Path(value="{taskId}")
    @Produces(value={"application/json"})
    public void getTaskInfo(@PathParam(value="taskId") TaskId taskId, @HeaderParam(value="X-Trino-Current-Version") Long currentVersion, @HeaderParam(value="X-Trino-Max-Wait") Duration maxWait, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        Objects.requireNonNull(taskId, "taskId is null");
        if (this.injectFailure(this.taskManager.getTraceToken(taskId), taskId, RequestType.GET_TASK_INFO, asyncResponse)) {
            return;
        }
        if (currentVersion == null || maxWait == null) {
            TaskInfo taskInfo = this.taskManager.getTaskInfo(taskId);
            if (TaskResource.shouldSummarize(uriInfo)) {
                taskInfo = taskInfo.summarize();
            }
            asyncResponse.resume((Object)taskInfo);
            return;
        }
        Duration waitTime = TaskResource.randomizeWaitTime(maxWait);
        ListenableFuture futureTaskInfo = MoreFutures.addTimeout(this.taskManager.getTaskInfo(taskId, currentVersion), () -> this.taskManager.getTaskInfo(taskId), (Duration)waitTime, (ScheduledExecutorService)this.timeoutExecutor);
        if (TaskResource.shouldSummarize(uriInfo)) {
            futureTaskInfo = Futures.transform((ListenableFuture)futureTaskInfo, TaskInfo::summarize, (Executor)MoreExecutors.directExecutor());
        }
        Duration timeout = new Duration((double)(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis()), TimeUnit.MILLISECONDS);
        AsyncResponseHandler.bindAsyncResponse((AsyncResponse)asyncResponse, (ListenableFuture)futureTaskInfo, (Executor)this.responseExecutor).withTimeout(timeout);
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @GET
    @Path(value="{taskId}/status")
    @Produces(value={"application/json"})
    public void getTaskStatus(@PathParam(value="taskId") TaskId taskId, @HeaderParam(value="X-Trino-Current-Version") Long currentVersion, @HeaderParam(value="X-Trino-Max-Wait") Duration maxWait, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        Objects.requireNonNull(taskId, "taskId is null");
        if (this.injectFailure(this.taskManager.getTraceToken(taskId), taskId, RequestType.GET_TASK_STATUS, asyncResponse)) {
            return;
        }
        if (currentVersion == null || maxWait == null) {
            TaskStatus taskStatus = this.taskManager.getTaskStatus(taskId);
            asyncResponse.resume((Object)taskStatus);
            return;
        }
        Duration waitTime = TaskResource.randomizeWaitTime(maxWait);
        ListenableFuture futureTaskStatus = MoreFutures.addTimeout(this.taskManager.getTaskStatus(taskId, currentVersion), () -> this.taskManager.getTaskStatus(taskId), (Duration)waitTime, (ScheduledExecutorService)this.timeoutExecutor);
        Duration timeout = new Duration((double)(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis()), TimeUnit.MILLISECONDS);
        AsyncResponseHandler.bindAsyncResponse((AsyncResponse)asyncResponse, (ListenableFuture)futureTaskStatus, (Executor)this.responseExecutor).withTimeout(timeout);
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @GET
    @Path(value="{taskId}/dynamicfilters")
    @Produces(value={"application/json"})
    public void acknowledgeAndGetNewDynamicFilterDomains(@PathParam(value="taskId") TaskId taskId, @HeaderParam(value="X-Trino-Current-Version") Long currentDynamicFiltersVersion, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(currentDynamicFiltersVersion, "currentDynamicFiltersVersion is null");
        if (this.injectFailure(this.taskManager.getTraceToken(taskId), taskId, RequestType.ACKNOWLEDGE_AND_GET_NEW_DYNAMIC_FILTER_DOMAINS, asyncResponse)) {
            return;
        }
        asyncResponse.resume((Object)this.taskManager.acknowledgeAndGetNewDynamicFilterDomains(taskId, currentDynamicFiltersVersion));
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @DELETE
    @Path(value="{taskId}")
    @Produces(value={"application/json"})
    public TaskInfo deleteTask(@PathParam(value="taskId") TaskId taskId, @QueryParam(value="abort") @DefaultValue(value="true") boolean abort, @Context UriInfo uriInfo) {
        Objects.requireNonNull(taskId, "taskId is null");
        TaskInfo taskInfo = abort ? this.taskManager.abortTask(taskId) : this.taskManager.cancelTask(taskId);
        if (TaskResource.shouldSummarize(uriInfo)) {
            taskInfo = taskInfo.summarize();
        }
        return taskInfo;
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @POST
    @Path(value="{taskId}/fail")
    @Consumes(value={"application/json"})
    @Produces(value={"application/json"})
    public TaskInfo failTask(@PathParam(value="taskId") TaskId taskId, FailTaskRequest failTaskRequest, @Context UriInfo uriInfo) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(failTaskRequest, "failTaskRequest is null");
        return this.taskManager.failTask(taskId, failTaskRequest.getFailureInfo().toException());
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @GET
    @Path(value="{taskId}/results/{bufferId}/{token}")
    @Produces(value={"application/X-trino-pages"})
    public void getResults(@PathParam(value="taskId") TaskId taskId, @PathParam(value="bufferId") OutputBuffers.OutputBufferId bufferId, @PathParam(value="token") long token, @HeaderParam(value="X-Trino-Max-Size") DataSize maxSize, @Suspended AsyncResponse asyncResponse) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(bufferId, "bufferId is null");
        if (this.injectFailure(this.taskManager.getTraceToken(taskId), taskId, RequestType.GET_RESULTS, asyncResponse)) {
            return;
        }
        TaskState state = this.taskManager.getTaskStatus(taskId).getState();
        boolean taskFailed = state == TaskState.ABORTED || state == TaskState.FAILED;
        long start = System.nanoTime();
        ListenableFuture bufferResultFuture = this.taskManager.getTaskResults(taskId, bufferId, token, maxSize);
        Duration waitTime = TaskResource.randomizeWaitTime(DEFAULT_MAX_WAIT_TIME);
        bufferResultFuture = MoreFutures.addTimeout(bufferResultFuture, () -> BufferResult.emptyResults(this.taskManager.getTaskInstanceId(taskId), token, false), (Duration)waitTime, (ScheduledExecutorService)this.timeoutExecutor);
        ListenableFuture responseFuture = Futures.transform((ListenableFuture)bufferResultFuture, result -> {
            Response.Status status;
            List<Slice> serializedPages = result.getSerializedPages();
            GenericEntity entity = null;
            if (serializedPages.isEmpty()) {
                status = Response.Status.NO_CONTENT;
            } else {
                entity = new GenericEntity(serializedPages, new TypeToken<List<Slice>>(){}.getType());
                status = Response.Status.OK;
            }
            return Response.status((Response.Status)status).entity(entity).header("X-Trino-Task-Instance-Id", (Object)result.getTaskInstanceId()).header("X-Trino-Page-Sequence-Id", (Object)result.getToken()).header("X-Trino-Page-End-Sequence-Id", (Object)result.getNextToken()).header("X-Trino-Buffer-Complete", (Object)result.isBufferComplete()).header("X-Trino-Task-Failed", (Object)taskFailed).build();
        }, (Executor)MoreExecutors.directExecutor());
        Duration timeout = new Duration((double)(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis()), TimeUnit.MILLISECONDS);
        AsyncResponseHandler.bindAsyncResponse((AsyncResponse)asyncResponse, (ListenableFuture)responseFuture, (Executor)this.responseExecutor).withTimeout(timeout, Response.status((Response.Status)Response.Status.NO_CONTENT).header("X-Trino-Task-Instance-Id", (Object)this.taskManager.getTaskInstanceId(taskId)).header("X-Trino-Page-Sequence-Id", (Object)token).header("X-Trino-Page-End-Sequence-Id", (Object)token).header("X-Trino-Buffer-Complete", (Object)false).header("X-Trino-Task-Failed", (Object)taskFailed).build());
        responseFuture.addListener(() -> this.readFromOutputBufferTime.add(Duration.nanosSince((long)start)), MoreExecutors.directExecutor());
        asyncResponse.register(throwable -> this.resultsRequestTime.add(Duration.nanosSince((long)start)));
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @GET
    @Path(value="{taskId}/results/{bufferId}/{token}/acknowledge")
    public void acknowledgeResults(@PathParam(value="taskId") TaskId taskId, @PathParam(value="bufferId") OutputBuffers.OutputBufferId bufferId, @PathParam(value="token") long token) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(bufferId, "bufferId is null");
        this.taskManager.acknowledgeTaskResults(taskId, bufferId, token);
    }

    @ResourceSecurity(value=ResourceSecurity.AccessType.INTERNAL_ONLY)
    @DELETE
    @Path(value="{taskId}/results/{bufferId}")
    public void destroyTaskResults(@PathParam(value="taskId") TaskId taskId, @PathParam(value="bufferId") OutputBuffers.OutputBufferId bufferId, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(bufferId, "bufferId is null");
        if (this.injectFailure(this.taskManager.getTraceToken(taskId), taskId, RequestType.DESTROY_RESULTS, asyncResponse)) {
            return;
        }
        this.taskManager.destroyTaskResults(taskId, bufferId);
        asyncResponse.resume((Object)Response.noContent().build());
    }

    private boolean injectFailure(Optional<String> traceToken, TaskId taskId, RequestType requestType, AsyncResponse asyncResponse) {
        if (traceToken.isEmpty()) {
            return false;
        }
        Optional<FailureInjector.InjectedFailure> injectedFailure = this.failureInjector.getInjectedFailure(traceToken.get(), taskId.getStageId().getId(), taskId.getPartitionId(), taskId.getAttemptId());
        if (injectedFailure.isEmpty()) {
            return false;
        }
        FailureInjector.InjectedFailure failure = injectedFailure.get();
        Duration timeout = this.failureInjector.getRequestTimeout();
        switch (failure.getInjectedFailureType()) {
            case TASK_MANAGEMENT_REQUEST_FAILURE: {
                if (!requestType.isTaskManagement()) break;
                log.info("Failing %s request for task %s", new Object[]{requestType, taskId});
                asyncResponse.resume((Object)Response.serverError().build());
                return true;
            }
            case TASK_MANAGEMENT_REQUEST_TIMEOUT: {
                if (!requestType.isTaskManagement()) break;
                log.info("Timing out %s request for task %s", new Object[]{requestType, taskId});
                asyncResponse.setTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS);
                return true;
            }
            case TASK_GET_RESULTS_REQUEST_FAILURE: {
                if (requestType.isTaskManagement()) break;
                log.info("Failing %s request for task %s", new Object[]{requestType, taskId});
                asyncResponse.resume((Object)Response.serverError().build());
                return true;
            }
            case TASK_GET_RESULTS_REQUEST_TIMEOUT: {
                if (requestType.isTaskManagement()) break;
                log.info("Timing out %s request for task %s", new Object[]{requestType, taskId});
                asyncResponse.setTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS);
                return true;
            }
            case TASK_FAILURE: {
                log.info("Injecting failure for task %s at %s", new Object[]{taskId, requestType});
                this.taskManager.failTask(taskId, injectedFailure.get().getTaskFailureException());
                break;
            }
            default: {
                throw new IllegalArgumentException("unexpected failure type: " + failure.getInjectedFailureType());
            }
        }
        return false;
    }

    @Managed
    @Nested
    public TimeStat getReadFromOutputBufferTime() {
        return this.readFromOutputBufferTime;
    }

    @Managed
    @Nested
    public TimeStat getResultsRequestTime() {
        return this.resultsRequestTime;
    }

    private static boolean shouldSummarize(UriInfo uriInfo) {
        return uriInfo.getQueryParameters().containsKey((Object)"summarize");
    }

    private static Duration randomizeWaitTime(Duration waitTime) {
        long halfWaitMillis = waitTime.toMillis() / 2L;
        return new Duration((double)(halfWaitMillis + ThreadLocalRandom.current().nextLong(halfWaitMillis)), TimeUnit.MILLISECONDS);
    }

    private static enum RequestType {
        CREATE_OR_UPDATE_TASK(true),
        GET_TASK_INFO(true),
        GET_TASK_STATUS(true),
        ACKNOWLEDGE_AND_GET_NEW_DYNAMIC_FILTER_DOMAINS(true),
        GET_RESULTS(false),
        DESTROY_RESULTS(false);

        private final boolean taskManagement;

        private RequestType(boolean taskManagement) {
            this.taskManagement = taskManagement;
        }

        public boolean isTaskManagement() {
            return this.taskManagement;
        }
    }
}

