/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord.http;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionHolder;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
import org.apache.druid.indexing.overlord.http.TaskStatusResponse;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.http.security.DatasourceResourceFilter;
import org.apache.druid.server.http.security.StateResourceFilter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;

@Path(value="/druid/indexer/v1")
public class OverlordResource {
    private static final Logger log = new Logger(OverlordResource.class);
    private final TaskMaster taskMaster;
    private final TaskStorageQueryAdapter taskStorageQueryAdapter;
    private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
    private final TaskLogStreamer taskLogStreamer;
    private final JacksonConfigManager configManager;
    private final AuditManager auditManager;
    private final AuthorizerMapper authorizerMapper;
    private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter;
    private final ProvisioningStrategy provisioningStrategy;
    private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
    private static final List API_TASK_STATES = ImmutableList.of((Object)"pending", (Object)"waiting", (Object)"running", (Object)"complete");

    @Inject
    public OverlordResource(TaskMaster taskMaster, TaskStorageQueryAdapter taskStorageQueryAdapter, IndexerMetadataStorageAdapter indexerMetadataStorageAdapter, TaskLogStreamer taskLogStreamer, JacksonConfigManager configManager, AuditManager auditManager, AuthorizerMapper authorizerMapper, WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter, ProvisioningStrategy provisioningStrategy) {
        this.taskMaster = taskMaster;
        this.taskStorageQueryAdapter = taskStorageQueryAdapter;
        this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter;
        this.taskLogStreamer = taskLogStreamer;
        this.configManager = configManager;
        this.auditManager = auditManager;
        this.authorizerMapper = authorizerMapper;
        this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter;
        this.provisioningStrategy = provisioningStrategy;
    }

    @POST
    @Path(value="/task")
    @Consumes(value={"application/json"})
    @Produces(value={"application/json"})
    public Response taskPost(final Task task, @Context HttpServletRequest req) {
        String dataSource = task.getDataSource();
        ResourceAction resourceAction = new ResourceAction(new Resource(dataSource, "DATASOURCE"), Action.WRITE);
        Access authResult = AuthorizationUtils.authorizeResourceAction((HttpServletRequest)req, (ResourceAction)resourceAction, (AuthorizerMapper)this.authorizerMapper);
        if (!authResult.isAllowed()) {
            throw new ForbiddenException(authResult.getMessage());
        }
        return this.asLeaderWith(this.taskMaster.getTaskQueue(), new Function<TaskQueue, Response>(){

            public Response apply(TaskQueue taskQueue) {
                try {
                    taskQueue.add(task);
                    return Response.ok((Object)ImmutableMap.of((Object)"task", (Object)task.getId())).build();
                }
                catch (EntryExistsException e) {
                    return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)ImmutableMap.of((Object)"error", (Object)StringUtils.format((String)"Task[%s] already exists!", (Object[])new Object[]{task.getId()}))).build();
                }
            }
        });
    }

    @GET
    @Path(value="/leader")
    @ResourceFilters(value={StateResourceFilter.class})
    @Produces(value={"application/json"})
    public Response getLeader() {
        return Response.ok((Object)this.taskMaster.getCurrentLeader()).build();
    }

    @GET
    @Path(value="/isLeader")
    @Produces(value={"application/json"})
    public Response isLeader() {
        boolean leading = this.taskMaster.isLeader();
        ImmutableMap response = ImmutableMap.of((Object)"leader", (Object)leading);
        if (leading) {
            return Response.ok((Object)response).build();
        }
        return Response.status((Response.Status)Response.Status.NOT_FOUND).entity((Object)response).build();
    }

    @POST
    @Path(value="/lockedIntervals")
    @Produces(value={"application/json"})
    @ResourceFilters(value={StateResourceFilter.class})
    public Response getDatasourceLockedIntervals(Map<String, Integer> minTaskPriority) {
        if (minTaskPriority == null || minTaskPriority.isEmpty()) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"No Datasource provided").build();
        }
        return Response.ok(this.taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
    }

    @GET
    @Path(value="/task/{taskid}")
    @Produces(value={"application/json"})
    @ResourceFilters(value={TaskResourceFilter.class})
    public Response getTaskPayload(@PathParam(value="taskid") String taskid) {
        TaskPayloadResponse response = new TaskPayloadResponse(taskid, (Task)this.taskStorageQueryAdapter.getTask(taskid).orNull());
        Response.Status status = response.getPayload() == null ? Response.Status.NOT_FOUND : Response.Status.OK;
        return Response.status((Response.Status)status).entity((Object)response).build();
    }

    @GET
    @Path(value="/task/{taskid}/status")
    @Produces(value={"application/json"})
    @ResourceFilters(value={TaskResourceFilter.class})
    public Response getTaskStatus(@PathParam(value="taskid") String taskid) {
        TaskInfo<Task, TaskStatus> taskInfo = this.taskStorageQueryAdapter.getTaskInfo(taskid);
        TaskStatusResponse response = null;
        if (taskInfo != null) {
            TaskRunner taskRunner;
            TaskRunnerWorkItem workItem;
            if (this.taskMaster.getTaskRunner().isPresent() && (workItem = (TaskRunnerWorkItem)(taskRunner = (TaskRunner)this.taskMaster.getTaskRunner().get()).getKnownTasks().stream().filter(item -> item.getTaskId().equals(taskid)).findAny().orElse(null)) != null) {
                response = new TaskStatusResponse(workItem.getTaskId(), new TaskStatusPlus(taskInfo.getId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getGroupId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getType(), taskInfo.getCreatedTime(), DateTimes.EPOCH, ((TaskStatus)taskInfo.getStatus()).getStatusCode(), taskRunner.getRunnerTaskState(workItem.getTaskId()), Long.valueOf(((TaskStatus)taskInfo.getStatus()).getDuration()), workItem.getLocation(), taskInfo.getDataSource(), ((TaskStatus)taskInfo.getStatus()).getErrorMsg()));
            }
            if (response == null) {
                response = new TaskStatusResponse(taskid, new TaskStatusPlus(taskInfo.getId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getGroupId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getType(), taskInfo.getCreatedTime(), DateTimes.EPOCH, ((TaskStatus)taskInfo.getStatus()).getStatusCode(), RunnerTaskState.WAITING, Long.valueOf(((TaskStatus)taskInfo.getStatus()).getDuration()), ((TaskStatus)taskInfo.getStatus()).getLocation() == null ? TaskLocation.unknown() : ((TaskStatus)taskInfo.getStatus()).getLocation(), taskInfo.getDataSource(), ((TaskStatus)taskInfo.getStatus()).getErrorMsg()));
            }
        } else {
            response = new TaskStatusResponse(taskid, null);
        }
        Response.Status status = response.getStatus() == null ? Response.Status.NOT_FOUND : Response.Status.OK;
        return Response.status((Response.Status)status).entity((Object)response).build();
    }

    @Deprecated
    @GET
    @Path(value="/task/{taskid}/segments")
    @Produces(value={"application/json"})
    @ResourceFilters(value={TaskResourceFilter.class})
    public Response getTaskSegments(@PathParam(value="taskid") String taskid) {
        Set<DataSegment> segments = this.taskStorageQueryAdapter.getInsertedSegments(taskid);
        return Response.ok().entity(segments).build();
    }

    @POST
    @Path(value="/task/{taskid}/shutdown")
    @Produces(value={"application/json"})
    @ResourceFilters(value={TaskResourceFilter.class})
    public Response doShutdown(final @PathParam(value="taskid") String taskid) {
        return this.asLeaderWith(this.taskMaster.getTaskQueue(), new Function<TaskQueue, Response>(){

            public Response apply(TaskQueue taskQueue) {
                taskQueue.shutdown(taskid, "Shutdown request from user", new Object[0]);
                return Response.ok((Object)ImmutableMap.of((Object)"task", (Object)taskid)).build();
            }
        });
    }

    @POST
    @Path(value="/datasources/{dataSource}/shutdownAllTasks")
    @Produces(value={"application/json"})
    @ResourceFilters(value={DatasourceResourceFilter.class})
    public Response shutdownTasksForDataSource(final @PathParam(value="dataSource") String dataSource) {
        return this.asLeaderWith(this.taskMaster.getTaskQueue(), new Function<TaskQueue, Response>(){

            public Response apply(TaskQueue taskQueue) {
                List<TaskInfo<Task, TaskStatus>> tasks = OverlordResource.this.taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
                if (tasks.isEmpty()) {
                    return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
                }
                for (TaskInfo<Task, TaskStatus> task : tasks) {
                    taskQueue.shutdown(task.getId(), "Shutdown request from user", new Object[0]);
                }
                return Response.ok((Object)ImmutableMap.of((Object)"dataSource", (Object)dataSource)).build();
            }
        });
    }

    @POST
    @Path(value="/taskStatus")
    @Produces(value={"application/json"})
    @ResourceFilters(value={StateResourceFilter.class})
    public Response getMultipleTaskStatuses(Set<String> taskIds) {
        if (taskIds == null || taskIds.size() == 0) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"No TaskIds provided.").build();
        }
        HashMap result = Maps.newHashMapWithExpectedSize((int)taskIds.size());
        for (String taskId : taskIds) {
            Optional<TaskStatus> optional = this.taskStorageQueryAdapter.getStatus(taskId);
            if (!optional.isPresent()) continue;
            result.put(taskId, optional.get());
        }
        return Response.ok().entity((Object)result).build();
    }

    @GET
    @Path(value="/worker")
    @Produces(value={"application/json"})
    @ResourceFilters(value={ConfigResourceFilter.class})
    public Response getWorkerConfig() {
        if (this.workerConfigRef == null) {
            this.workerConfigRef = this.configManager.watch("worker.config", WorkerBehaviorConfig.class);
        }
        return Response.ok((Object)this.workerConfigRef.get()).build();
    }

    @GET
    @Path(value="/totalWorkerCapacity")
    @Produces(value={"application/json"})
    @ResourceFilters(value={ConfigResourceFilter.class})
    public Response getTotalWorkerCapacity() {
        int maximumCapacity;
        WorkerBehaviorConfig workerBehaviorConfig;
        int currentCapacity;
        ImmutableList workers;
        Optional<TaskRunner> taskRunnerOptional = this.taskMaster.getTaskRunner();
        if (!taskRunnerOptional.isPresent()) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).build();
        }
        TaskRunner taskRunner = (TaskRunner)taskRunnerOptional.get();
        if (taskRunner instanceof WorkerTaskRunner) {
            workers = ((WorkerTaskRunner)taskRunner).getWorkers();
            currentCapacity = workers.stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
        } else {
            log.debug("Cannot calculate capacity as task runner [%s] of type [%s] does not support listing workers", new Object[]{taskRunner, taskRunner.getClass().getName()});
            workers = ImmutableList.of();
            currentCapacity = -1;
        }
        if (this.workerConfigRef == null) {
            this.workerConfigRef = this.configManager.watch("worker.config", WorkerBehaviorConfig.class);
        }
        if ((workerBehaviorConfig = this.workerConfigRef.get()) == null) {
            log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured", new Object[0]);
            maximumCapacity = -1;
        } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
            DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig)workerBehaviorConfig;
            if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
                log.debug("Cannot calculate maximum worker capacity as auto scaler not configured", new Object[0]);
                maximumCapacity = -1;
            } else {
                int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
                int expectedWorkerCapacity = this.provisioningStrategy.getExpectedWorkerCapacity((Collection<ImmutableWorkerInfo>)workers);
                maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
            }
        } else {
            log.debug("Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity", new Object[]{workerBehaviorConfig, workerBehaviorConfig.getClass().getSimpleName()});
            maximumCapacity = -1;
        }
        return Response.ok((Object)new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity)).build();
    }

    @POST
    @Path(value="/worker")
    @Consumes(value={"application/json"})
    @ResourceFilters(value={ConfigResourceFilter.class})
    public Response setWorkerConfig(WorkerBehaviorConfig workerBehaviorConfig, @HeaderParam(value="X-Druid-Author") @DefaultValue(value="") String author, @HeaderParam(value="X-Druid-Comment") @DefaultValue(value="") String comment, @Context HttpServletRequest req) {
        ConfigManager.SetResult setResult = this.configManager.set("worker.config", (Object)workerBehaviorConfig, new AuditInfo(author, comment, req.getRemoteAddr()));
        if (setResult.isOk()) {
            log.info("Updating Worker configs: %s", new Object[]{workerBehaviorConfig});
            return Response.ok().build();
        }
        return Response.status((Response.Status)Response.Status.BAD_REQUEST).build();
    }

    @GET
    @Path(value="/worker/history")
    @Produces(value={"application/json"})
    @ResourceFilters(value={ConfigResourceFilter.class})
    public Response getWorkerConfigHistory(@QueryParam(value="interval") String interval, @QueryParam(value="count") Integer count) {
        Interval theInterval;
        Interval interval2 = theInterval = interval == null ? null : Intervals.of((String)interval);
        if (theInterval == null && count != null) {
            try {
                List workerEntryList = this.auditManager.fetchAuditHistory("worker.config", "worker.config", count.intValue());
                return Response.ok((Object)workerEntryList).build();
            }
            catch (IllegalArgumentException e) {
                return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)ImmutableMap.of((Object)"error", (Object)e.getMessage())).build();
            }
        }
        List workerEntryList = this.auditManager.fetchAuditHistory("worker.config", "worker.config", theInterval);
        return Response.ok((Object)workerEntryList).build();
    }

    @POST
    @Path(value="/action")
    @Produces(value={"application/json"})
    @ResourceFilters(value={StateResourceFilter.class})
    public Response doAction(final TaskActionHolder holder) {
        return this.asLeaderWith(this.taskMaster.getTaskActionClient(holder.getTask()), new Function<TaskActionClient, Response>(){

            public Response apply(TaskActionClient taskActionClient) {
                HashMap retMap;
                try {
                    Object ret = taskActionClient.submit(holder.getAction());
                    retMap = new HashMap();
                    retMap.put("result", ret);
                }
                catch (Exception e) {
                    log.warn((Throwable)e, "Failed to perform task action", new Object[0]);
                    return Response.serverError().entity((Object)ImmutableMap.of((Object)"error", (Object)e.getMessage())).build();
                }
                return Response.ok().entity(retMap).build();
            }
        });
    }

    @GET
    @Path(value="/waitingTasks")
    @Produces(value={"application/json"})
    public Response getWaitingTasks(@Context HttpServletRequest req) {
        return this.getTasks("waiting", null, null, null, null, req);
    }

    @GET
    @Path(value="/pendingTasks")
    @Produces(value={"application/json"})
    public Response getPendingTasks(@Context HttpServletRequest req) {
        return this.getTasks("pending", null, null, null, null, req);
    }

    @GET
    @Path(value="/runningTasks")
    @Produces(value={"application/json"})
    public Response getRunningTasks(@QueryParam(value="type") String taskType, @Context HttpServletRequest req) {
        return this.getTasks("running", null, null, null, taskType, req);
    }

    @GET
    @Path(value="/completeTasks")
    @Produces(value={"application/json"})
    public Response getCompleteTasks(@QueryParam(value="n") Integer maxTaskStatuses, @Context HttpServletRequest req) {
        return this.getTasks("complete", null, null, maxTaskStatuses, null, req);
    }

    @GET
    @Path(value="/tasks")
    @Produces(value={"application/json"})
    public Response getTasks(@QueryParam(value="state") String state, @QueryParam(value="datasource") String dataSource, @QueryParam(value="createdTimeInterval") String createdTimeInterval, @QueryParam(value="max") Integer maxCompletedTasks, @QueryParam(value="type") String type, @Context HttpServletRequest req) {
        ResourceAction resourceAction;
        Access authResult;
        if (state != null && !API_TASK_STATES.contains(StringUtils.toLowerCase((String)state))) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("text/plain").entity((Object)StringUtils.format((String)"Invalid state : %s, valid values are: %s", (Object[])new Object[]{state, API_TASK_STATES})).build();
        }
        if (dataSource != null && !(authResult = AuthorizationUtils.authorizeResourceAction((HttpServletRequest)req, (ResourceAction)(resourceAction = new ResourceAction(new Resource(dataSource, "DATASOURCE"), Action.READ)), (AuthorizerMapper)this.authorizerMapper)).isAllowed()) {
            throw new WebApplicationException(Response.status((Response.Status)Response.Status.FORBIDDEN).type("text/plain").entity((Object)StringUtils.format((String)"Access-Check-Result: %s", (Object[])new Object[]{authResult.toString()})).build());
        }
        return this.asLeaderWith(this.taskMaster.getTaskRunner(), taskRunner -> {
            List<TaskStatusPlus> authorizedList = this.securedTaskStatusPlus(this.getTasks((TaskRunner)taskRunner, TaskStateLookup.fromString(state), dataSource, createdTimeInterval, maxCompletedTasks, type), dataSource, req);
            return Response.ok(authorizedList).build();
        });
    }

    private List<TaskStatusPlus> getTasks(TaskRunner taskRunner, TaskStateLookup state, @Nullable String dataSource, @Nullable String createdTimeInterval, @Nullable Integer maxCompletedTasks, @Nullable String type) {
        Duration createdTimeDuration;
        if (createdTimeInterval != null) {
            Interval theInterval = Intervals.of((String)StringUtils.replace((String)createdTimeInterval, (String)"_", (String)"/"));
            createdTimeDuration = theInterval.toDuration();
        } else {
            createdTimeDuration = null;
        }
        Stream<TaskInfo<Task, TaskStatus>> taskInfoStreamFromTaskStorage = this.getTaskInfoStreamFromTaskStorage(state, dataSource, createdTimeDuration, maxCompletedTasks, type);
        Map<String, ? extends TaskRunnerWorkItem> runnerWorkItems = this.getTaskRunnerWorkItems(taskRunner, state, dataSource, type);
        if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) {
            taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage.filter(info -> runnerWorkItems.containsKey(info.getId()));
        }
        List taskInfoFromTaskStorage = taskInfoStreamFromTaskStorage.collect(Collectors.toList());
        ArrayList<TaskInfo> completeTaskInfoFromTaskStorage = new ArrayList<TaskInfo>();
        ArrayList<TaskInfo> activeTaskInfoFromTaskStorage = new ArrayList<TaskInfo>();
        for (TaskInfo info2 : taskInfoFromTaskStorage) {
            if (((TaskStatus)info2.getStatus()).isComplete()) {
                completeTaskInfoFromTaskStorage.add(info2);
                continue;
            }
            activeTaskInfoFromTaskStorage.add(info2);
        }
        ArrayList<TaskStatusPlus> statuses = new ArrayList<TaskStatusPlus>();
        completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add(new TaskStatusPlus(taskInfo.getId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getGroupId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getType(), taskInfo.getCreatedTime(), DateTimes.EPOCH, ((TaskStatus)taskInfo.getStatus()).getStatusCode(), RunnerTaskState.NONE, Long.valueOf(((TaskStatus)taskInfo.getStatus()).getDuration()), ((TaskStatus)taskInfo.getStatus()).getLocation(), taskInfo.getDataSource(), ((TaskStatus)taskInfo.getStatus()).getErrorMsg())));
        activeTaskInfoFromTaskStorage.forEach(taskInfo -> {
            TaskRunnerWorkItem runnerWorkItem = (TaskRunnerWorkItem)runnerWorkItems.get(taskInfo.getId());
            if (runnerWorkItem == null) {
                if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) {
                    statuses.add(new TaskStatusPlus(taskInfo.getId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getGroupId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getType(), taskInfo.getCreatedTime(), DateTimes.EPOCH, ((TaskStatus)taskInfo.getStatus()).getStatusCode(), RunnerTaskState.WAITING, Long.valueOf(((TaskStatus)taskInfo.getStatus()).getDuration()), ((TaskStatus)taskInfo.getStatus()).getLocation(), taskInfo.getDataSource(), ((TaskStatus)taskInfo.getStatus()).getErrorMsg()));
                }
            } else if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) {
                statuses.add(new TaskStatusPlus(taskInfo.getId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getGroupId(), taskInfo.getTask() == null ? null : ((Task)taskInfo.getTask()).getType(), runnerWorkItem.getCreatedTime(), runnerWorkItem.getQueueInsertionTime(), ((TaskStatus)taskInfo.getStatus()).getStatusCode(), taskRunner.getRunnerTaskState(taskInfo.getId()), Long.valueOf(((TaskStatus)taskInfo.getStatus()).getDuration()), runnerWorkItem.getLocation(), taskInfo.getDataSource(), ((TaskStatus)taskInfo.getStatus()).getErrorMsg()));
            }
        });
        return statuses;
    }

    private Stream<TaskInfo<Task, TaskStatus>> getTaskInfoStreamFromTaskStorage(TaskStateLookup state, @Nullable String dataSource, Duration createdTimeDuration, @Nullable Integer maxCompletedTasks, @Nullable String type) {
        ImmutableMap taskLookups;
        switch (state) {
            case ALL: {
                taskLookups = ImmutableMap.of((Object)TaskLookup.TaskLookupType.ACTIVE, (Object)TaskLookup.ActiveTaskLookup.getInstance(), (Object)TaskLookup.TaskLookupType.COMPLETE, (Object)TaskLookup.CompleteTaskLookup.of((Integer)maxCompletedTasks, (Duration)createdTimeDuration));
                break;
            }
            case COMPLETE: {
                taskLookups = ImmutableMap.of((Object)TaskLookup.TaskLookupType.COMPLETE, (Object)TaskLookup.CompleteTaskLookup.of((Integer)maxCompletedTasks, (Duration)createdTimeDuration));
                break;
            }
            case WAITING: 
            case PENDING: 
            case RUNNING: {
                taskLookups = ImmutableMap.of((Object)TaskLookup.TaskLookupType.ACTIVE, (Object)TaskLookup.ActiveTaskLookup.getInstance());
                break;
            }
            default: {
                throw new IAE("Unknown state: [%s]", new Object[]{state});
            }
        }
        Stream<TaskInfo<Task, TaskStatus>> taskInfoStreamFromTaskStorage = this.taskStorageQueryAdapter.getTaskInfos((Map<TaskLookup.TaskLookupType, TaskLookup>)taskLookups, dataSource).stream();
        if (type != null) {
            return taskInfoStreamFromTaskStorage.filter(info -> type.equals(info.getTask() == null ? null : ((Task)info.getTask()).getType()));
        }
        return taskInfoStreamFromTaskStorage;
    }

    private Map<String, ? extends TaskRunnerWorkItem> getTaskRunnerWorkItems(TaskRunner taskRunner, TaskStateLookup state, @Nullable String dataSource, @Nullable String type) {
        Stream<Object> runnerWorkItemsStream;
        switch (state) {
            case ALL: 
            case WAITING: {
                runnerWorkItemsStream = taskRunner.getKnownTasks().stream();
                break;
            }
            case PENDING: {
                runnerWorkItemsStream = taskRunner.getPendingTasks().stream();
                break;
            }
            case RUNNING: {
                runnerWorkItemsStream = taskRunner.getRunningTasks().stream();
                break;
            }
            case COMPLETE: {
                runnerWorkItemsStream = Stream.empty();
                break;
            }
            default: {
                throw new IAE("Unknown state: [%s]", new Object[]{state});
            }
        }
        if (dataSource != null) {
            runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> dataSource.equals(item.getDataSource()));
        }
        if (type != null) {
            runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> type.equals(item.getTaskType()));
        }
        return runnerWorkItemsStream.collect(Collectors.toMap(TaskRunnerWorkItem::getTaskId, item -> item));
    }

    @DELETE
    @Path(value="/pendingSegments/{dataSource}")
    @Produces(value={"application/json"})
    public Response killPendingSegments(@PathParam(value="dataSource") String dataSource, @QueryParam(value="interval") String deleteIntervalString, @Context HttpServletRequest request) {
        Interval deleteInterval = Intervals.of((String)deleteIntervalString);
        Access authResult = AuthorizationUtils.authorizeAllResourceActions((HttpServletRequest)request, (Iterable)ImmutableList.of((Object)new ResourceAction(new Resource(dataSource, "DATASOURCE"), Action.READ), (Object)new ResourceAction(new Resource(dataSource, "DATASOURCE"), Action.WRITE)), (AuthorizerMapper)this.authorizerMapper);
        if (!authResult.isAllowed()) {
            throw new ForbiddenException(authResult.getMessage());
        }
        if (this.taskMaster.isLeader()) {
            int numDeleted = this.indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval);
            return Response.ok().entity((Object)ImmutableMap.of((Object)"numDeleted", (Object)numDeleted)).build();
        }
        return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).build();
    }

    @GET
    @Path(value="/workers")
    @Produces(value={"application/json"})
    @ResourceFilters(value={StateResourceFilter.class})
    public Response getWorkers() {
        return this.asLeaderWith(this.taskMaster.getTaskRunner(), new Function<TaskRunner, Response>(){

            public Response apply(TaskRunner taskRunner) {
                if (taskRunner instanceof WorkerTaskRunner) {
                    return Response.ok(((WorkerTaskRunner)taskRunner).getWorkers()).build();
                }
                log.debug("Task runner [%s] of type [%s] does not support listing workers", new Object[]{taskRunner, taskRunner.getClass().getName()});
                return Response.serverError().entity((Object)ImmutableMap.of((Object)"error", (Object)"Task Runner does not support worker listing")).build();
            }
        });
    }

    @POST
    @Path(value="/worker/{host}/enable")
    @Produces(value={"application/json"})
    @ResourceFilters(value={StateResourceFilter.class})
    public Response enableWorker(@PathParam(value="host") String host) {
        return this.changeWorkerStatus(host, WorkerTaskRunner.ActionType.ENABLE);
    }

    @POST
    @Path(value="/worker/{host}/disable")
    @Produces(value={"application/json"})
    @ResourceFilters(value={StateResourceFilter.class})
    public Response disableWorker(@PathParam(value="host") String host) {
        return this.changeWorkerStatus(host, WorkerTaskRunner.ActionType.DISABLE);
    }

    private Response changeWorkerStatus(String host, WorkerTaskRunner.ActionType action) {
        try {
            if (WorkerTaskRunner.ActionType.DISABLE.equals((Object)action)) {
                this.workerTaskRunnerQueryAdapter.disableWorker(host);
                return Response.ok((Object)ImmutableMap.of((Object)host, (Object)"disabled")).build();
            }
            if (WorkerTaskRunner.ActionType.ENABLE.equals((Object)action)) {
                this.workerTaskRunnerQueryAdapter.enableWorker(host);
                return Response.ok((Object)ImmutableMap.of((Object)host, (Object)"enabled")).build();
            }
            return Response.serverError().entity((Object)ImmutableMap.of((Object)"error", (Object)("Worker does not support " + (Object)((Object)action) + " action!"))).build();
        }
        catch (Exception e) {
            log.error((Throwable)e, "Error in posting [%s] action to [%s]", new Object[]{action, host});
            return Response.serverError().entity((Object)ImmutableMap.of((Object)"error", (Object)e.getMessage())).build();
        }
    }

    @GET
    @Path(value="/scaling")
    @Produces(value={"application/json"})
    @ResourceFilters(value={StateResourceFilter.class})
    public Response getScalingState() {
        Optional<ScalingStats> rms = this.taskMaster.getScalingStats();
        if (rms.isPresent()) {
            return Response.ok((Object)rms.get()).build();
        }
        return Response.ok().build();
    }

    @GET
    @Path(value="/task/{taskid}/log")
    @Produces(value={"text/plain; charset=UTF-8"})
    @ResourceFilters(value={TaskResourceFilter.class})
    public Response doGetLog(@PathParam(value="taskid") String taskid, @QueryParam(value="offset") @DefaultValue(value="0") long offset) {
        try {
            Optional stream = this.taskLogStreamer.streamTaskLog(taskid, offset);
            if (stream.isPresent()) {
                return Response.ok((Object)((ByteSource)stream.get()).openStream()).build();
            }
            return Response.status((Response.Status)Response.Status.NOT_FOUND).entity((Object)"No log was found for this task. The task may not exist, or it may not have begun running yet.").build();
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Failed to stream log for task %s", new Object[]{taskid});
            return Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).build();
        }
    }

    @GET
    @Path(value="/task/{taskid}/reports")
    @Produces(value={"application/json"})
    @ResourceFilters(value={TaskResourceFilter.class})
    public Response doGetReports(@PathParam(value="taskid") String taskid) {
        try {
            Optional stream = this.taskLogStreamer.streamTaskReports(taskid);
            if (stream.isPresent()) {
                return Response.ok((Object)((ByteSource)stream.get()).openStream()).build();
            }
            return Response.status((Response.Status)Response.Status.NOT_FOUND).entity((Object)"No task reports were found for this task. The task may not exist, or it may not have completed yet.").build();
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Failed to stream task reports for task %s", new Object[]{taskid});
            return Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).build();
        }
    }

    private <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f) {
        if (x.isPresent()) {
            return (Response)f.apply(x.get());
        }
        return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).build();
    }

    private List<TaskStatusPlus> securedTaskStatusPlus(List<TaskStatusPlus> collectionToFilter, @Nullable String dataSource, HttpServletRequest req) {
        Function raGenerator = taskStatusPlus -> {
            String taskId = taskStatusPlus.getId();
            String taskDatasource = taskStatusPlus.getDataSource();
            if (taskDatasource == null) {
                throw new WebApplicationException(Response.serverError().entity((Object)StringUtils.format((String)"No task information found for task with id: [%s]", (Object[])new Object[]{taskId})).build());
            }
            return Collections.singletonList(new ResourceAction(new Resource(taskDatasource, "DATASOURCE"), Action.READ));
        };
        if (dataSource != null) {
            return collectionToFilter;
        }
        return Lists.newArrayList((Iterable)AuthorizationUtils.filterAuthorizedResources((HttpServletRequest)req, collectionToFilter, (Function)raGenerator, (AuthorizerMapper)this.authorizerMapper));
    }

    private static enum TaskStateLookup {
        ALL,
        WAITING,
        PENDING,
        RUNNING,
        COMPLETE;


        private static TaskStateLookup fromString(@Nullable String state) {
            if (state == null) {
                return ALL;
            }
            return TaskStateLookup.valueOf(StringUtils.toUpperCase((String)state));
        }
    }
}

