/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.DimensionCardinalityReport;
import org.apache.druid.indexing.common.task.batch.parallel.GeneratedPartitionsReport;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartitionLocation;
import org.apache.druid.indexing.common.task.batch.parallel.PartitionStat;
import org.apache.druid.indexing.common.task.batch.parallel.PushedSegmentsReport;
import org.apache.druid.indexing.common.task.batch.parallel.SegmentAllocationRequest;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskHistory;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Interval;

public class ParallelIndexSupervisorTask
extends AbstractBatchIndexTask
implements ChatHandler {
    public static final String TYPE = "index_parallel";
    private static final Logger LOG = new Logger(ParallelIndexSupervisorTask.class);
    private static final String TASK_PHASE_FAILURE_MSG = "Failed in phase[%s]. See task logs for details.";
    private static final long DEFAULT_NUM_SHARDS_WHEN_ESTIMATE_GOES_NEGATIVE = 7L;
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final String baseSubtaskSpecName;
    private final InputSource baseInputSource;
    private final boolean missingIntervalsInOverwriteMode;
    private final long awaitSegmentAvailabilityTimeoutMillis;
    private @MonotonicNonNull AuthorizerMapper authorizerMapper;
    private volatile @MonotonicNonNull CurrentSubTaskHolder currentSubTaskHolder;
    private volatile @MonotonicNonNull TaskToolbox toolbox;
    private volatile @MonotonicNonNull Pair<Map<String, Object>, Map<String, Object>> indexGenerateRowStats;
    private IngestionState ingestionState;

    @JsonCreator
    public ParallelIndexSupervisorTask(@JsonProperty(value="id") String id, @JsonProperty(value="groupId") @Nullable String groupId, @JsonProperty(value="resource") TaskResource taskResource, @JsonProperty(value="spec") ParallelIndexIngestionSpec ingestionSchema, @JsonProperty(value="context") Map<String, Object> context) {
        this(id, groupId, taskResource, ingestionSchema, null, context);
    }

    public ParallelIndexSupervisorTask(String id, @Nullable String groupId, TaskResource taskResource, ParallelIndexIngestionSpec ingestionSchema, @Nullable String baseSubtaskSpecName, Map<String, Object> context) {
        super(ParallelIndexSupervisorTask.getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()), groupId, taskResource, ingestionSchema.getDataSchema().getDataSource(), context, ingestionSchema.getTuningConfig().getMaxAllowedLockCount(), ParallelIndexSupervisorTask.computeBatchIngestionMode(ingestionSchema.getIOConfig()));
        this.ingestionSchema = ingestionSchema;
        String string = this.baseSubtaskSpecName = baseSubtaskSpecName == null ? this.getId() : baseSubtaskSpecName;
        if (this.getIngestionMode() == AbstractTask.IngestionMode.REPLACE && ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
            throw new ISE("GranularitySpec's intervals cannot be empty when using replace.", new Object[0]);
        }
        if (ParallelIndexSupervisorTask.isGuaranteedRollup(this.getIngestionMode(), ingestionSchema.getTuningConfig())) {
            ParallelIndexSupervisorTask.checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
        }
        this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource();
        boolean bl = this.missingIntervalsInOverwriteMode = this.getIngestionMode() != AbstractTask.IngestionMode.APPEND && ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
        if (this.missingIntervalsInOverwriteMode) {
            this.addToContext("forceTimeChunkLock", true);
        }
        this.awaitSegmentAvailabilityTimeoutMillis = ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis();
        this.ingestionState = IngestionState.NOT_STARTED;
    }

    private static void checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec) {
        if (!partitionsSpec.isForceGuaranteedRollupCompatible()) {
            String incompatibiltyMsg = partitionsSpec.getForceGuaranteedRollupIncompatiblityReason();
            String msg = "forceGuaranteedRollup is incompatible with partitionsSpec: " + incompatibiltyMsg;
            throw new ISE(msg, new Object[0]);
        }
    }

    @Override
    public String getType() {
        return TYPE;
    }

    @Override
    @Nonnull
    @JsonIgnore
    public Set<ResourceAction> getInputSourceResources() {
        if (this.getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
            throw this.getInputSecurityOnFirehoseUnsupportedError();
        }
        return this.getIngestionSchema().getIOConfig().getInputSource() != null ? this.getIngestionSchema().getIOConfig().getInputSource().getTypes().stream().map(i -> new ResourceAction(new Resource(i, "EXTERNAL"), Action.READ)).collect(Collectors.toSet()) : ImmutableSet.of();
    }

    @JsonProperty(value="spec")
    public ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Nullable
    @VisibleForTesting
    ParallelIndexTaskRunner getCurrentRunner() {
        if (this.isParallelMode()) {
            return this.currentSubTaskHolder == null ? null : (ParallelIndexTaskRunner)this.currentSubTaskHolder.getTask();
        }
        return null;
    }

    @Nullable
    @VisibleForTesting
    <T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(TaskToolbox toolbox, Function<TaskToolbox, ParallelIndexTaskRunner<T, R>> runnerCreator) {
        ParallelIndexTaskRunner<T, R> newRunner = runnerCreator.apply(toolbox);
        if (this.currentSubTaskHolder.setTask(newRunner)) {
            return newRunner;
        }
        return null;
    }

    private static TaskState runNextPhase(@Nullable ParallelIndexTaskRunner nextPhaseRunner) throws Exception {
        if (nextPhaseRunner == null) {
            LOG.info("Task is asked to stop. Finish as failed", new Object[0]);
            return TaskState.FAILED;
        }
        return nextPhaseRunner.run();
    }

    @VisibleForTesting
    SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolbox) {
        return new SinglePhaseParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.baseSubtaskSpecName, this.ingestionSchema, this.getContext());
    }

    @VisibleForTesting
    PartialDimensionCardinalityParallelIndexTaskRunner createPartialDimensionCardinalityRunner(TaskToolbox toolbox) {
        return new PartialDimensionCardinalityParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.baseSubtaskSpecName, this.ingestionSchema, this.getContext());
    }

    @VisibleForTesting
    PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner(TaskToolbox toolbox, ParallelIndexIngestionSpec ingestionSchema, @Nullable Map<Interval, Integer> intervalToNumShardsOverride) {
        return new PartialHashSegmentGenerateParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.baseSubtaskSpecName, ingestionSchema, this.getContext(), intervalToNumShardsOverride);
    }

    @VisibleForTesting
    PartialDimensionDistributionParallelIndexTaskRunner createPartialDimensionDistributionRunner(TaskToolbox toolbox) {
        return new PartialDimensionDistributionParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.baseSubtaskSpecName, this.ingestionSchema, this.getContext());
    }

    @VisibleForTesting
    PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGenerateRunner(TaskToolbox toolbox, Map<Interval, PartitionBoundaries> intervalToPartitions, ParallelIndexIngestionSpec ingestionSchema) {
        return new PartialRangeSegmentGenerateParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.baseSubtaskSpecName, ingestionSchema, this.getContext(), intervalToPartitions);
    }

    @VisibleForTesting
    PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner(TaskToolbox toolbox, List<PartialSegmentMergeIOConfig> ioConfigs, ParallelIndexIngestionSpec ingestionSchema) {
        return new PartialGenericSegmentMergeParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.baseSubtaskSpecName, ingestionSchema.getDataSchema(), ioConfigs, ingestionSchema.getTuningConfig(), this.getContext());
    }

    @Override
    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        return this.determineLockGranularityAndTryLock(taskActionClient, this.ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals());
    }

    @Override
    public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals) throws IOException {
        return ParallelIndexSupervisorTask.findInputSegments(this.getDataSource(), taskActionClient, intervals);
    }

    @Override
    public boolean requireLockExistingSegments() {
        return this.getIngestionMode() != AbstractTask.IngestionMode.APPEND;
    }

    @Override
    public boolean isPerfectRollup() {
        return ParallelIndexSupervisorTask.isGuaranteedRollup(this.getIngestionMode(), this.getIngestionSchema().getTuningConfig());
    }

    @Override
    @Nullable
    public Granularity getSegmentGranularity() {
        GranularitySpec granularitySpec = this.ingestionSchema.getDataSchema().getGranularitySpec();
        if (granularitySpec instanceof ArbitraryGranularitySpec) {
            return null;
        }
        return granularitySpec.getSegmentGranularity();
    }

    @Override
    public TaskStatus runTask(TaskToolbox toolbox) throws Exception {
        if (this.ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() != 0) {
            LOG.warn("maxSavedParseExceptions is not supported yet", new Object[0]);
        }
        if (this.ingestionSchema.getTuningConfig().getMaxParseExceptions() != Integer.MAX_VALUE) {
            LOG.warn("maxParseExceptions is not supported yet", new Object[0]);
        }
        if (this.ingestionSchema.getTuningConfig().isLogParseExceptions()) {
            LOG.warn("logParseExceptions is not supported yet", new Object[0]);
        }
        if (this.missingIntervalsInOverwriteMode) {
            LOG.warn("Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. Forced to use timeChunk lock.", new Object[0]);
        }
        LOG.debug("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider)Preconditions.checkNotNull((Object)toolbox.getChatHandlerProvider(), (Object)"chatHandlerProvider")).getClass().getName()});
        this.authorizerMapper = toolbox.getAuthorizerMapper();
        toolbox.getChatHandlerProvider().register(this.getId(), (ChatHandler)this, false);
        this.addToContextIfAbsent("useLineageBasedSegmentAllocation", false);
        try {
            this.initializeSubTaskCleaner();
            if (this.isParallelMode()) {
                this.emitMetric(toolbox.getEmitter(), "ingest/count", 1);
                this.toolbox = toolbox;
                if (ParallelIndexSupervisorTask.isGuaranteedRollup(this.getIngestionMode(), this.ingestionSchema.getTuningConfig())) {
                    TaskStatus taskStatus = this.runMultiPhaseParallel(toolbox);
                    return taskStatus;
                }
                TaskStatus taskStatus = this.runSinglePhaseParallel(toolbox);
                return taskStatus;
            }
            if (!this.baseInputSource.isSplittable()) {
                LOG.warn("firehoseFactory[%s] is not splittable. Running sequentially.", new Object[]{this.baseInputSource.getClass().getSimpleName()});
            } else if (this.ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() <= 1) {
                LOG.warn("maxNumConcurrentSubTasks[%s] is less than or equal to 1. Running sequentially. Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel ingestion mode.", new Object[]{this.ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks()});
            } else {
                throw new ISE("Unknown reason for sequentail mode. Failing this task.", new Object[0]);
            }
            TaskStatus taskStatus = this.runSequential(toolbox);
            return taskStatus;
        }
        finally {
            this.ingestionState = IngestionState.COMPLETED;
            toolbox.getChatHandlerProvider().unregister(this.getId());
        }
    }

    private void initializeSubTaskCleaner() {
        this.currentSubTaskHolder = this.isParallelMode() ? new CurrentSubTaskHolder((currentRunnerObject, taskConfig) -> {
            ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner)currentRunnerObject;
            runner.stopGracefully(null);
        }) : new CurrentSubTaskHolder((taskObject, taskConfig) -> {
            IndexTask task = (IndexTask)taskObject;
            task.stopGracefully((TaskConfig)taskConfig);
        });
        this.registerResourceCloserOnAbnormalExit(this.currentSubTaskHolder);
    }

    public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig) {
        if (null == tuningConfig) {
            return false;
        }
        boolean useRangePartitions = ParallelIndexSupervisorTask.useRangePartitions(tuningConfig);
        int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
        return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
    }

    private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig) {
        return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof DimensionRangePartitionsSpec;
    }

    private boolean isParallelMode() {
        return ParallelIndexSupervisorTask.isParallelMode(this.baseInputSource, this.ingestionSchema.getTuningConfig());
    }

    private void waitForSegmentAvailability(Map<String, PushedSegmentsReport> reportsMap) {
        ArrayList<DataSegment> segmentsToWaitFor = new ArrayList<DataSegment>();
        reportsMap.values().forEach(report -> segmentsToWaitFor.addAll(report.getNewSegments()));
        this.waitForSegmentAvailability(this.toolbox, segmentsToWaitFor, this.awaitSegmentAvailabilityTimeoutMillis);
    }

    private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception {
        TaskStatus taskStatus;
        this.ingestionState = IngestionState.BUILD_SEGMENTS;
        ParallelIndexTaskRunner parallelSinglePhaseRunner = this.createRunner(toolbox, this::createSinglePhaseTaskRunner);
        TaskState state = ParallelIndexSupervisorTask.runNextPhase(parallelSinglePhaseRunner);
        if (state.isSuccess()) {
            this.publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
            if (this.awaitSegmentAvailabilityTimeoutMillis > 0L) {
                this.waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
            }
            taskStatus = TaskStatus.success((String)this.getId());
        } else {
            Preconditions.checkState((boolean)state.isFailure(), (String)"Unrecognized state after task is complete[%s]", (Object)state);
            String errorMessage = parallelSinglePhaseRunner.getStopReason() != null ? parallelSinglePhaseRunner.getStopReason() : StringUtils.format((String)TASK_PHASE_FAILURE_MSG, (Object[])new Object[]{parallelSinglePhaseRunner.getName()});
            taskStatus = TaskStatus.failure((String)this.getId(), (String)errorMessage);
        }
        toolbox.getTaskReportFileWriter().write(this.getId(), this.getTaskCompletionReports(taskStatus, this.segmentAvailabilityConfirmationCompleted));
        return taskStatus;
    }

    private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception {
        return ParallelIndexSupervisorTask.useRangePartitions(this.ingestionSchema.getTuningConfig()) ? this.runRangePartitionMultiPhaseParallel(toolbox) : this.runHashPartitionMultiPhaseParallel(toolbox);
    }

    private static ParallelIndexIngestionSpec rewriteIngestionSpecWithIntervalsIfMissing(ParallelIndexIngestionSpec ingestionSchema, Collection<Interval> intervals) {
        if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
            return ingestionSchema.withDataSchema(ingestionSchema.getDataSchema().withGranularitySpec(ingestionSchema.getDataSchema().getGranularitySpec().withIntervals(new ArrayList<Interval>(intervals))));
        }
        return ingestionSchema;
    }

    @VisibleForTesting
    TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception {
        TaskStatus taskStatus;
        ParallelIndexIngestionSpec segmentMergeIngestionSpec;
        Map<Interval, Integer> intervalToNumShards;
        TaskState state;
        boolean needsInputSampling;
        ParallelIndexIngestionSpec ingestionSchemaToUse = this.ingestionSchema;
        if (!(this.ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
            throw new ISE("forceGuaranteedRollup is set but partitionsSpec [%s] is not a single_dim or hash partition spec.", new Object[]{this.ingestionSchema.getTuningConfig().getPartitionsSpec()});
        }
        HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec)this.ingestionSchema.getTuningConfig().getPartitionsSpec();
        boolean bl = needsInputSampling = partitionsSpec.getNumShards() == null || ingestionSchemaToUse.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
        if (needsInputSampling) {
            LOG.info("Needs to determine intervals or numShards, beginning %s phase.", new Object[]{"partial_dimension_cardinality"});
            ParallelIndexTaskRunner cardinalityRunner = this.createRunner(toolbox, this::createPartialDimensionCardinalityRunner);
            state = ParallelIndexSupervisorTask.runNextPhase(cardinalityRunner);
            if (state.isFailure()) {
                String errMsg = StringUtils.format((String)TASK_PHASE_FAILURE_MSG, (Object[])new Object[]{cardinalityRunner.getName()});
                return TaskStatus.failure((String)this.getId(), (String)errMsg);
            }
            if (cardinalityRunner.getReports().isEmpty()) {
                String msg = "No valid rows for hash partitioning. All rows may have invalid timestamps or have been filtered out.";
                LOG.warn(msg, new Object[0]);
                return TaskStatus.success((String)this.getId(), (String)msg);
            }
            if (partitionsSpec.getNumShards() == null) {
                int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null ? 5000000 : partitionsSpec.getMaxRowsPerSegment();
                LOG.info("effective maxRowsPerSegment is: " + effectiveMaxRowsPerSegment, new Object[0]);
                intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(cardinalityRunner.getReports().values(), effectiveMaxRowsPerSegment);
                LOG.debug("intervalToNumShards: %s", new Object[]{intervalToNumShards.toString()});
            } else {
                intervalToNumShards = CollectionUtils.mapValues(ParallelIndexSupervisorTask.mergeCardinalityReports(cardinalityRunner.getReports().values()), k -> partitionsSpec.getNumShards());
            }
            ingestionSchemaToUse = ParallelIndexSupervisorTask.rewriteIngestionSpecWithIntervalsIfMissing(ingestionSchemaToUse, intervalToNumShards.keySet());
        } else {
            intervalToNumShards = null;
        }
        ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse;
        ParallelIndexTaskRunner indexingRunner = this.createRunner(toolbox, f -> this.createPartialHashSegmentGenerateRunner(toolbox, segmentCreateIngestionSpec, intervalToNumShards));
        state = ParallelIndexSupervisorTask.runNextPhase(indexingRunner);
        if (state.isFailure()) {
            String errMsg = StringUtils.format((String)TASK_PHASE_FAILURE_MSG, (Object[])new Object[]{indexingRunner.getName()});
            return TaskStatus.failure((String)this.getId(), (String)errMsg);
        }
        this.indexGenerateRowStats = this.doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);
        Map<Partition, List<PartitionLocation>> partitionToLocations = ParallelIndexSupervisorTask.getPartitionToLocations(indexingRunner.getReports());
        List<PartialSegmentMergeIOConfig> ioConfigs = ParallelIndexSupervisorTask.createGenericMergeIOConfigs(this.ingestionSchema.getTuningConfig().getTotalNumMergeTasks(), partitionToLocations);
        ParallelIndexTaskRunner mergeRunner = this.createRunner(toolbox, arg_0 -> this.lambda$runHashPartitionMultiPhaseParallel$6(ioConfigs, segmentMergeIngestionSpec = ingestionSchemaToUse, arg_0));
        state = ParallelIndexSupervisorTask.runNextPhase(mergeRunner);
        if (state.isSuccess()) {
            this.publishSegments(toolbox, mergeRunner.getReports());
            if (this.awaitSegmentAvailabilityTimeoutMillis > 0L) {
                this.waitForSegmentAvailability(mergeRunner.getReports());
            }
            taskStatus = TaskStatus.success((String)this.getId());
        } else {
            Preconditions.checkState((boolean)state.isFailure(), (String)"Unrecognized state after task is complete[%s]", (Object)state);
            String errMsg = StringUtils.format((String)TASK_PHASE_FAILURE_MSG, (Object[])new Object[]{mergeRunner.getName()});
            taskStatus = TaskStatus.failure((String)this.getId(), (String)errMsg);
        }
        toolbox.getTaskReportFileWriter().write(this.getId(), this.getTaskCompletionReports(taskStatus, this.segmentAvailabilityConfirmationCompleted));
        return taskStatus;
    }

    @VisibleForTesting
    TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception {
        TaskStatus taskStatus;
        ParallelIndexIngestionSpec segmentMergeIngestionSpec;
        Map<Interval, PartitionBoundaries> intervalToPartitions;
        ParallelIndexIngestionSpec ingestionSchemaToUse = this.ingestionSchema;
        PartialDimensionDistributionParallelIndexTaskRunner distributionRunner = (PartialDimensionDistributionParallelIndexTaskRunner)this.createRunner(toolbox, this::createPartialDimensionDistributionRunner);
        TaskState distributionState = ParallelIndexSupervisorTask.runNextPhase(distributionRunner);
        if (distributionState.isFailure()) {
            String errMsg = StringUtils.format((String)TASK_PHASE_FAILURE_MSG, (Object[])new Object[]{distributionRunner.getName()});
            return TaskStatus.failure((String)this.getId(), (String)errMsg);
        }
        try {
            intervalToPartitions = distributionRunner.getIntervalToPartitionBoundaries((DimensionRangePartitionsSpec)this.ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
            if (intervalToPartitions.isEmpty()) {
                String msg = "No valid rows for range partitioning. All rows may have invalid timestamps or multiple dimension values.";
                LOG.warn(msg, new Object[0]);
                return TaskStatus.success((String)this.getId(), (String)msg);
            }
        }
        catch (Exception e) {
            String errorMsg = "Error creating partition boundaries.";
            if (distributionRunner.getStopReason() != null) {
                errorMsg = errorMsg + " " + distributionRunner.getStopReason();
            }
            LOG.error((Throwable)e, errorMsg, new Object[0]);
            return TaskStatus.failure((String)this.getId(), (String)errorMsg);
        }
        ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse = ParallelIndexSupervisorTask.rewriteIngestionSpecWithIntervalsIfMissing(ingestionSchemaToUse, intervalToPartitions.keySet());
        ParallelIndexTaskRunner indexingRunner = this.createRunner(toolbox, tb -> this.createPartialRangeSegmentGenerateRunner((TaskToolbox)tb, intervalToPartitions, segmentCreateIngestionSpec));
        TaskState indexingState = ParallelIndexSupervisorTask.runNextPhase(indexingRunner);
        if (indexingState.isFailure()) {
            String errMsg = StringUtils.format((String)TASK_PHASE_FAILURE_MSG, (Object[])new Object[]{indexingRunner.getName()});
            return TaskStatus.failure((String)this.getId(), (String)errMsg);
        }
        this.indexGenerateRowStats = this.doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);
        Map<Partition, List<PartitionLocation>> partitionToLocations = ParallelIndexSupervisorTask.getPartitionToLocations(indexingRunner.getReports());
        List<PartialSegmentMergeIOConfig> ioConfigs = ParallelIndexSupervisorTask.createGenericMergeIOConfigs(this.ingestionSchema.getTuningConfig().getTotalNumMergeTasks(), partitionToLocations);
        ParallelIndexTaskRunner mergeRunner = this.createRunner(toolbox, arg_0 -> this.lambda$runRangePartitionMultiPhaseParallel$8(ioConfigs, segmentMergeIngestionSpec = ingestionSchemaToUse, arg_0));
        TaskState mergeState = ParallelIndexSupervisorTask.runNextPhase(mergeRunner);
        if (mergeState.isSuccess()) {
            this.publishSegments(toolbox, mergeRunner.getReports());
            if (this.awaitSegmentAvailabilityTimeoutMillis > 0L) {
                this.waitForSegmentAvailability(mergeRunner.getReports());
            }
            taskStatus = TaskStatus.success((String)this.getId());
        } else {
            Preconditions.checkState((boolean)mergeState.isFailure(), (String)"Unrecognized state after task is complete[%s]", (Object)mergeState);
            String errMsg = StringUtils.format((String)TASK_PHASE_FAILURE_MSG, (Object[])new Object[]{mergeRunner.getName()});
            taskStatus = TaskStatus.failure((String)this.getId(), (String)errMsg);
        }
        toolbox.getTaskReportFileWriter().write(this.getId(), this.getTaskCompletionReports(taskStatus, this.segmentAvailabilityConfirmationCompleted));
        return taskStatus;
    }

    private static Map<Interval, Union> mergeCardinalityReports(Collection<DimensionCardinalityReport> reports) {
        HashMap<Interval, Union> finalCollectors = new HashMap<Interval, Union>();
        reports.forEach(report -> {
            Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
            for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
                HllSketch entryHll = HllSketch.wrap((Memory)Memory.wrap((byte[])entry.getValue()));
                finalCollectors.computeIfAbsent(entry.getKey(), k -> new Union(11)).update(entryHll);
            }
        });
        return finalCollectors;
    }

    @VisibleForTesting
    public static Map<Interval, Integer> determineNumShardsFromCardinalityReport(Collection<DimensionCardinalityReport> reports, int maxRowsPerSegment) {
        Map<Interval, Union> finalCollectors = ParallelIndexSupervisorTask.mergeCardinalityReports(reports);
        return ParallelIndexSupervisorTask.computeIntervalToNumShards(maxRowsPerSegment, finalCollectors);
    }

    @Nonnull
    @VisibleForTesting
    static Map<Interval, Integer> computeIntervalToNumShards(int maxRowsPerSegment, Map<Interval, Union> finalCollectors) {
        return CollectionUtils.mapValues(finalCollectors, union -> {
            long estimatedNumShards;
            double estimatedCardinality = union.getEstimate();
            if (estimatedCardinality <= 0.0) {
                estimatedNumShards = 7L;
                LOG.warn("Estimated cardinality for union of estimates is zero or less: %.2f, setting num shards to %d", new Object[]{estimatedCardinality, estimatedNumShards});
            } else {
                estimatedNumShards = Math.round(estimatedCardinality / (double)maxRowsPerSegment);
            }
            LOG.info("estimatedNumShards %d given estimated cardinality %.2f and maxRowsPerSegment %d", new Object[]{estimatedNumShards, estimatedCardinality, maxRowsPerSegment});
            if (estimatedNumShards == 1L) {
                LOG.info("estimatedNumShards is ONE (%d) given estimated cardinality %.2f and maxRowsPerSegment %d", new Object[]{estimatedNumShards, estimatedCardinality, maxRowsPerSegment});
            }
            try {
                return Math.max(Math.toIntExact(estimatedNumShards), 1);
            }
            catch (ArithmeticException ae) {
                throw new ISE("Estimated numShards [%s] exceeds integer bounds.", new Object[]{estimatedNumShards});
            }
        });
    }

    static Map<Partition, List<PartitionLocation>> getPartitionToLocations(Map<String, GeneratedPartitionsReport> subTaskIdToReport) {
        TreeMap partitionToReports = new TreeMap(Comparator.comparingLong(partition -> partition.getInterval().getStartMillis()).thenComparingLong(partition -> partition.getInterval().getEndMillis()).thenComparingInt(Partition::getBucketId));
        subTaskIdToReport.forEach((subTaskId, report) -> report.getPartitionStats().forEach(partitionStat -> partitionToReports.computeIfAbsent(Partition.fromStat(partitionStat), p -> new ArrayList()).add(new PartitionReport((String)subTaskId, (PartitionStat)partitionStat))));
        HashMap<Partition, List<PartitionLocation>> partitionToLocations = new HashMap<Partition, List<PartitionLocation>>();
        Interval prevInterval = null;
        AtomicInteger partitionId = new AtomicInteger(0);
        for (Map.Entry entry : partitionToReports.entrySet()) {
            Partition partition2 = (Partition)entry.getKey();
            Interval interval = partition2.getInterval();
            if (!interval.equals(prevInterval)) {
                partitionId.set(0);
                prevInterval = interval;
            }
            List reportsOfPartition = (List)entry.getValue();
            BuildingShardSpec shardSpec = ((PartitionReport)reportsOfPartition.get(0)).getPartitionStat().getSecondaryPartition().convert(partitionId.getAndIncrement());
            List locationsOfPartition = reportsOfPartition.stream().map(report -> report.getPartitionStat().toPartitionLocation(report.getSubTaskId(), shardSpec)).collect(Collectors.toList());
            partitionToLocations.put(partition2, locationsOfPartition);
        }
        return partitionToLocations;
    }

    private static List<PartialSegmentMergeIOConfig> createGenericMergeIOConfigs(int totalNumMergeTasks, Map<Partition, List<PartitionLocation>> partitionToLocations) {
        return ParallelIndexSupervisorTask.createMergeIOConfigs(totalNumMergeTasks, partitionToLocations, PartialSegmentMergeIOConfig::new);
    }

    @VisibleForTesting
    static <M extends PartialSegmentMergeIOConfig, L extends PartitionLocation> List<M> createMergeIOConfigs(int totalNumMergeTasks, Map<Partition, List<L>> partitionToLocations, Function<List<L>, M> createPartialSegmentMergeIOConfig) {
        int numMergeTasks = Math.min(totalNumMergeTasks, partitionToLocations.size());
        LOG.info("Number of merge tasks is set to [%d] based on totalNumMergeTasks[%d] and number of partitions[%d]", new Object[]{numMergeTasks, totalNumMergeTasks, partitionToLocations.size()});
        ArrayList<Partition> partitions = new ArrayList<Partition>(partitionToLocations.keySet());
        Collections.shuffle(partitions, ThreadLocalRandom.current());
        ArrayList<PartialSegmentMergeIOConfig> assignedPartitionLocations = new ArrayList<PartialSegmentMergeIOConfig>(numMergeTasks);
        for (int i = 0; i < numMergeTasks; ++i) {
            Pair<Integer, Integer> partitionBoundaries = ParallelIndexSupervisorTask.getPartitionBoundaries(i, partitions.size(), numMergeTasks);
            List assignedToSameTask = partitions.subList((Integer)partitionBoundaries.lhs, (Integer)partitionBoundaries.rhs).stream().flatMap(partition -> ((List)partitionToLocations.get(partition)).stream()).collect(Collectors.toList());
            assignedPartitionLocations.add((PartialSegmentMergeIOConfig)createPartialSegmentMergeIOConfig.apply(assignedToSameTask));
        }
        return assignedPartitionLocations;
    }

    private static Pair<Integer, Integer> getPartitionBoundaries(int index, int total, int splits) {
        int chunk = total / splits;
        int remainder = total % splits;
        int start = index * chunk + (index < remainder ? index : remainder);
        int stop = start + chunk + (index < remainder ? 1 : 0);
        return Pair.of((Object)start, (Object)stop);
    }

    private void publishSegments(TaskToolbox toolbox, Map<String, PushedSegmentsReport> reportsMap) throws IOException {
        boolean published;
        TombstoneHelper tombstoneHelper;
        List<Interval> tombstoneIntervals;
        HashSet oldSegments = new HashSet();
        HashSet<DataSegment> newSegments = new HashSet<DataSegment>();
        reportsMap.values().forEach(report -> {
            oldSegments.addAll(report.getOldSegments());
            newSegments.addAll(report.getNewSegments());
        });
        boolean storeCompactionState = this.getContextValue("storeCompactionState", false);
        Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = ParallelIndexSupervisorTask.compactionStateAnnotateFunction(storeCompactionState, toolbox, this.ingestionSchema);
        Set<Object> tombStones = Collections.emptySet();
        if (this.getIngestionMode() == AbstractTask.IngestionMode.REPLACE && !(tombstoneIntervals = (tombstoneHelper = new TombstoneHelper(toolbox.getTaskActionClient())).computeTombstoneIntervals(newSegments, this.ingestionSchema.getDataSchema())).isEmpty()) {
            HashMap<Interval, SegmentIdWithShardSpec> tombstonesAnShards = new HashMap<Interval, SegmentIdWithShardSpec>();
            for (Interval interval : tombstoneIntervals) {
                SegmentIdWithShardSpec segmentIdWithShardSpec = this.allocateNewSegmentForTombstone(this.ingestionSchema, interval.getStart());
                tombstonesAnShards.put(interval, segmentIdWithShardSpec);
            }
            tombStones = tombstoneHelper.computeTombstones(this.ingestionSchema.getDataSchema(), tombstonesAnShards);
            newSegments.addAll(tombStones);
            LOG.debugSegments(tombStones, "To publish tombstones");
        }
        TaskLockType taskLockType = this.getTaskLockHelper().getLockTypeToUse();
        TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit(this.buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType));
        boolean bl = published = newSegments.isEmpty() || publisher.publishSegments(oldSegments, newSegments, annotateFunction, null).isSuccess();
        if (!published) {
            throw new ISE("Failed to publish segments", new Object[0]);
        }
        LOG.info("Published [%d] segments", new Object[]{newSegments.size()});
        this.emitMetric(toolbox.getEmitter(), "ingest/tombstones/count", tombStones.size());
        this.emitMetric(toolbox.getEmitter(), "ingest/segments/count", newSegments.size());
    }

    private TaskStatus runSequential(TaskToolbox toolbox) throws Exception {
        IndexTask sequentialIndexTask = new IndexTask(this.getId(), this.getGroupId(), this.getTaskResource(), this.getDataSource(), this.baseSubtaskSpecName, new IndexTask.IndexIngestionSpec(this.getIngestionSchema().getDataSchema(), this.getIngestionSchema().getIOConfig(), ParallelIndexSupervisorTask.convertToIndexTuningConfig(this.getIngestionSchema().getTuningConfig())), this.getContext(), this.getIngestionSchema().getTuningConfig().getMaxAllowedLockCount(), false);
        if (this.currentSubTaskHolder.setTask(sequentialIndexTask) && sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
            return sequentialIndexTask.run(toolbox);
        }
        String msg = "Task was asked to stop. Finish as failed";
        LOG.info(msg, new Object[0]);
        return TaskStatus.failure((String)this.getId(), (String)msg);
    }

    private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus, boolean segmentAvailabilityConfirmed) {
        Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparseableEvents = this.doGetRowStatsAndUnparseableEvents("true", true);
        return TaskReport.buildTaskReports(new IngestionStatsAndErrorsTaskReport(this.getId(), new IngestionStatsAndErrorsTaskReportData(IngestionState.COMPLETED, (Map)rowStatsAndUnparseableEvents.rhs, (Map)rowStatsAndUnparseableEvents.lhs, taskStatus.getErrorMsg(), segmentAvailabilityConfirmed, this.segmentAvailabilityWaitTimeMs)));
    }

    private static IndexTask.IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig) {
        return new IndexTask.IndexTuningConfig(null, null, tuningConfig.getAppendableIndexSpec(), tuningConfig.getMaxRowsInMemory(), tuningConfig.getMaxBytesInMemory(), tuningConfig.isSkipBytesInMemoryOverheadCheck(), null, null, null, null, tuningConfig.getPartitionsSpec(), tuningConfig.getIndexSpec(), tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getMaxPendingPersists(), tuningConfig.isForceGuaranteedRollup(), tuningConfig.isReportParseExceptions(), null, tuningConfig.getPushTimeout(), tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.isLogParseExceptions(), tuningConfig.getMaxParseExceptions(), tuningConfig.getMaxSavedParseExceptions(), tuningConfig.getMaxColumnsToMerge(), tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis());
    }

    @POST
    @Path(value="/segment/allocate")
    @Produces(value={"application/x-jackson-smile"})
    @Consumes(value={"application/x-jackson-smile"})
    public Response allocateSegment(Object param, @Context HttpServletRequest req) {
        ChatHandlers.authorizationCheck((HttpServletRequest)req, (Action)Action.READ, (String)this.getDataSource(), (AuthorizerMapper)this.authorizerMapper);
        if (this.toolbox == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner)Preconditions.checkNotNull((Object)this.getCurrentRunner(), (Object)"runner");
        if (!(runner instanceof SinglePhaseParallelIndexTaskRunner)) {
            throw new ISE("Expected [%s], but [%s] is in use", new Object[]{SinglePhaseParallelIndexTaskRunner.class.getName(), runner.getClass().getName()});
        }
        boolean useLineageBasedSegmentAllocation = (Boolean)Preconditions.checkNotNull((Object)((Boolean)this.getContextValue("useLineageBasedSegmentAllocation")), (Object)"useLineageBasedSegmentAllocation in taskContext");
        try {
            SegmentIdWithShardSpec segmentIdentifier;
            if (useLineageBasedSegmentAllocation) {
                SegmentAllocationRequest request = (SegmentAllocationRequest)this.toolbox.getJsonMapper().convertValue(param, SegmentAllocationRequest.class);
                segmentIdentifier = ((SinglePhaseParallelIndexTaskRunner)runner).allocateNewSegment(this.getDataSource(), request.getTimestamp(), request.getSequenceName(), request.getPrevSegmentId());
            } else {
                DateTime timestamp = (DateTime)this.toolbox.getJsonMapper().convertValue(param, DateTime.class);
                segmentIdentifier = ((SinglePhaseParallelIndexTaskRunner)runner).allocateNewSegment(this.getDataSource(), timestamp);
            }
            return Response.ok((Object)this.toolbox.getJsonMapper().writeValueAsBytes((Object)segmentIdentifier)).build();
        }
        catch (MaxAllowedLocksExceededException malee) {
            this.getCurrentRunner().stopGracefully(malee.getMessage());
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)malee.getMessage()).build();
        }
        catch (IOException | IllegalStateException e) {
            return Response.serverError().entity((Object)Throwables.getStackTraceAsString((Throwable)e)).build();
        }
        catch (IllegalArgumentException e) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)Throwables.getStackTraceAsString((Throwable)e)).build();
        }
    }

    static InputFormat getInputFormat(ParallelIndexIngestionSpec ingestionSchema) {
        return ingestionSchema.getIOConfig().getNonNullInputFormat();
    }

    @POST
    @Path(value="/report")
    @Consumes(value={"application/x-jackson-smile"})
    public Response report(SubTaskReport report, @Context HttpServletRequest req) {
        ChatHandlers.authorizationCheck((HttpServletRequest)req, (Action)Action.WRITE, (String)this.getDataSource(), (AuthorizerMapper)this.authorizerMapper);
        if (this.currentSubTaskHolder == null || this.currentSubTaskHolder.getTask() == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner)this.currentSubTaskHolder.getTask();
        runner.collectReport(report);
        return Response.ok().build();
    }

    @GET
    @Path(value="/mode")
    @Produces(value={"application/json"})
    public Response getMode(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        return Response.ok((Object)(this.isParallelMode() ? "parallel" : "sequential")).build();
    }

    @GET
    @Path(value="/phase")
    @Produces(value={"application/json"})
    public Response getPhaseName(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.isParallelMode()) {
            ParallelIndexTaskRunner runner = this.getCurrentRunner();
            if (runner == null) {
                return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running").build();
            }
            return Response.ok((Object)runner.getName()).build();
        }
        return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"task is running in the sequential mode").build();
    }

    @GET
    @Path(value="/progress")
    @Produces(value={"application/json"})
    public Response getProgress(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok((Object)currentRunner.getProgress()).build();
    }

    @GET
    @Path(value="/subtasks/running")
    @Produces(value={"application/json"})
    public Response getRunningTasks(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(currentRunner.getRunningTaskIds()).build();
    }

    @GET
    @Path(value="/subtaskspecs")
    @Produces(value={"application/json"})
    public Response getSubTaskSpecs(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(currentRunner.getSubTaskSpecs()).build();
    }

    @GET
    @Path(value="/subtaskspecs/running")
    @Produces(value={"application/json"})
    public Response getRunningSubTaskSpecs(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(currentRunner.getRunningSubTaskSpecs()).build();
    }

    @GET
    @Path(value="/subtaskspecs/complete")
    @Produces(value={"application/json"})
    public Response getCompleteSubTaskSpecs(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(currentRunner.getCompleteSubTaskSpecs()).build();
    }

    @GET
    @Path(value="/subtaskspec/{id}")
    @Produces(value={"application/json"})
    public Response getSubTaskSpec(@PathParam(value="id") String id, @Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        SubTaskSpec subTaskSpec = currentRunner.getSubTaskSpec(id);
        if (subTaskSpec == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        return Response.ok(subTaskSpec).build();
    }

    @GET
    @Path(value="/subtaskspec/{id}/state")
    @Produces(value={"application/json"})
    public Response getSubTaskState(@PathParam(value="id") String id, @Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        ParallelIndexTaskRunner.SubTaskSpecStatus subTaskSpecStatus = currentRunner.getSubTaskState(id);
        if (subTaskSpecStatus == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        return Response.ok((Object)subTaskSpecStatus).build();
    }

    @GET
    @Path(value="/subtaskspec/{id}/history")
    @Produces(value={"application/json"})
    public Response getCompleteSubTaskSpecAttemptHistory(@PathParam(value="id") String id, @Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        TaskHistory taskHistory = currentRunner.getCompleteSubTaskSpecAttemptHistory(id);
        if (taskHistory == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        return Response.ok(taskHistory.getAttemptHistory()).build();
    }

    private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats) {
        if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
            return (RowIngestionMetersTotals)buildSegmentsRowStats;
        }
        if (buildSegmentsRowStats instanceof Map) {
            Map buildSegmentsRowStatsMap = (Map)buildSegmentsRowStats;
            return new RowIngestionMetersTotals(((Number)buildSegmentsRowStatsMap.get("processed")).longValue(), ((Number)buildSegmentsRowStatsMap.get("processedBytes")).longValue(), ((Number)buildSegmentsRowStatsMap.get("processedWithError")).longValue(), ((Number)buildSegmentsRowStatsMap.get("thrownAway")).longValue(), ((Number)buildSegmentsRowStatsMap.get("unparseable")).longValue());
        }
        throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
    }

    private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner, boolean includeUnparseable) {
        SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
        ArrayList<ParseExceptionReport> unparseableEvents = new ArrayList<ParseExceptionReport>();
        Map completedSubtaskReports = parallelSinglePhaseRunner.getReports();
        for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
            Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
            if (taskReport == null || taskReport.isEmpty()) {
                LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId(), new Object[0]);
                continue;
            }
            RowIngestionMetersTotals rowIngestionMetersTotals = this.getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
            buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
        }
        RowIngestionMetersTotals rowStatsForRunningTasks = this.getRowStatsAndUnparseableEventsForRunningTasks(parallelSinglePhaseRunner.getRunningTaskIds(), unparseableEvents, includeUnparseable);
        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
        return this.createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
    }

    private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(ParallelIndexTaskRunner<?, ?> currentRunner, boolean includeUnparseable) {
        if (this.indexGenerateRowStats != null) {
            return Pair.of((Object)((Map)this.indexGenerateRowStats.lhs), (Object)(includeUnparseable ? (Map)this.indexGenerateRowStats.rhs : ImmutableMap.of()));
        }
        if (!currentRunner.getName().equals("partial segment generation")) {
            return Pair.of((Object)ImmutableMap.of(), (Object)ImmutableMap.of());
        }
        Map<String, ?> completedSubtaskReports = currentRunner.getReports();
        SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
        ArrayList<ParseExceptionReport> unparseableEvents = new ArrayList<ParseExceptionReport>();
        for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
            Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
            if (taskReport == null || taskReport.isEmpty()) {
                LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId(), new Object[0]);
                continue;
            }
            RowIngestionMetersTotals rowStatsForCompletedTask = this.getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
            buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
        }
        RowIngestionMetersTotals rowStatsForRunningTasks = this.getRowStatsAndUnparseableEventsForRunningTasks(currentRunner.getRunningTaskIds(), unparseableEvents, includeUnparseable);
        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
        return this.createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
    }

    private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(Set<String> runningTaskIds, List<ParseExceptionReport> unparseableEvents, boolean includeUnparseable) {
        SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
        for (String runningTaskId : runningTaskIds) {
            try {
                Map<String, Object> report = ParallelIndexSupervisorTask.getTaskReport(this.toolbox.getOverlordClient(), runningTaskId);
                if (report == null || report.isEmpty()) continue;
                Map ingestionStatsAndErrors = (Map)report.get("ingestionStatsAndErrors");
                Map payload = (Map)ingestionStatsAndErrors.get("payload");
                Map rowStats = (Map)payload.get("rowStats");
                Map totals = (Map)rowStats.get("totals");
                Map buildSegments = (Map)totals.get("buildSegments");
                if (includeUnparseable) {
                    Map taskUnparseableEvents = (Map)payload.get("unparseableEvents");
                    List buildSegmentsUnparseableEvents = (List)taskUnparseableEvents.get("buildSegments");
                    unparseableEvents.addAll(buildSegmentsUnparseableEvents);
                }
                buildSegmentsRowStats.addRowIngestionMetersTotals(this.getTotalsFromBuildSegmentsRowStats(buildSegments));
            }
            catch (Exception e) {
                LOG.warn((Throwable)e, "Encountered exception when getting live subtask report for task: " + runningTaskId, new Object[0]);
            }
        }
        return buildSegmentsRowStats.getTotals();
    }

    private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(RowIngestionMetersTotals rowStats, List<ParseExceptionReport> unparseableEvents) {
        HashMap rowStatsMap = new HashMap();
        HashMap<String, RowIngestionMetersTotals> totalsMap = new HashMap<String, RowIngestionMetersTotals>();
        totalsMap.put("buildSegments", rowStats);
        rowStatsMap.put("totals", totalsMap);
        return Pair.of(rowStatsMap, (Object)ImmutableMap.of((Object)"buildSegments", unparseableEvents));
    }

    private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(Map<String, TaskReport> taskReport, boolean includeUnparseable, List<ParseExceptionReport> unparseableEvents) {
        IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport)taskReport.get("ingestionStatsAndErrors");
        IngestionStatsAndErrorsTaskReportData reportData = (IngestionStatsAndErrorsTaskReportData)ingestionStatsAndErrorsReport.getPayload();
        RowIngestionMetersTotals totals = this.getTotalsFromBuildSegmentsRowStats(reportData.getRowStats().get("buildSegments"));
        if (includeUnparseable) {
            List taskUnparsebleEvents = (List)reportData.getUnparseableEvents().get("buildSegments");
            unparseableEvents.addAll(taskUnparsebleEvents);
        }
        return totals;
    }

    private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable) {
        if (this.currentSubTaskHolder == null) {
            return Pair.of((Object)ImmutableMap.of(), (Object)ImmutableMap.of());
        }
        Object currentRunner = this.currentSubTaskHolder.getTask();
        if (currentRunner == null) {
            return Pair.of((Object)ImmutableMap.of(), (Object)ImmutableMap.of());
        }
        if (this.isParallelMode()) {
            if (ParallelIndexSupervisorTask.isGuaranteedRollup(this.getIngestionMode(), this.ingestionSchema.getTuningConfig())) {
                return this.doGetRowStatsAndUnparseableEventsParallelMultiPhase((ParallelIndexTaskRunner)currentRunner, includeUnparseable);
            }
            return this.doGetRowStatsAndUnparseableEventsParallelSinglePhase((SinglePhaseParallelIndexTaskRunner)currentRunner, includeUnparseable);
        }
        IndexTask currentSequentialTask = (IndexTask)currentRunner;
        return Pair.of(currentSequentialTask.doGetRowStats(full), currentSequentialTask.doGetUnparseableEvents(full));
    }

    @GET
    @Path(value="/rowStats")
    @Produces(value={"application/json"})
    public Response getRowStats(@Context HttpServletRequest req, @QueryParam(value="full") String full) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        return Response.ok((Object)this.doGetRowStatsAndUnparseableEvents((String)full, (boolean)false).lhs).build();
    }

    @VisibleForTesting
    public Map<String, Object> doGetLiveReports(String full) {
        IndexTask currentSequentialTask;
        HashMap<String, Object> returnMap = new HashMap<String, Object>();
        HashMap<String, Object> ingestionStatsAndErrors = new HashMap<String, Object>();
        HashMap<String, Object> payload = new HashMap<String, Object>();
        Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents = this.doGetRowStatsAndUnparseableEvents(full, true);
        IngestionState ingestionStateForReport = this.isParallelMode() ? this.ingestionState : ((currentSequentialTask = (IndexTask)this.currentSubTaskHolder.getTask()) == null ? this.ingestionState : currentSequentialTask.getIngestionState());
        payload.put("ingestionState", ingestionStateForReport);
        payload.put("unparseableEvents", rowStatsAndUnparsebleEvents.rhs);
        payload.put("rowStats", rowStatsAndUnparsebleEvents.lhs);
        ingestionStatsAndErrors.put("taskId", this.getId());
        ingestionStatsAndErrors.put("payload", payload);
        ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
        returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
        return returnMap;
    }

    @GET
    @Path(value="/liveReports")
    @Produces(value={"application/json"})
    public Response getLiveReports(@Context HttpServletRequest req, @QueryParam(value="full") String full) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        return Response.ok(this.doGetLiveReports(full)).build();
    }

    @Nullable
    @VisibleForTesting
    static Map<String, Object> getTaskReport(OverlordClient overlordClient, String taskId) throws InterruptedException, ExecutionException {
        try {
            return (Map)FutureUtils.get((ListenableFuture)overlordClient.taskReportAsMap(taskId), (boolean)true);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof HttpResponseException && ((HttpResponseException)e.getCause()).getResponse().getStatus().equals((Object)HttpResponseStatus.NOT_FOUND)) {
                return null;
            }
            throw e;
        }
    }

    private /* synthetic */ ParallelIndexTaskRunner lambda$runRangePartitionMultiPhaseParallel$8(List ioConfigs, ParallelIndexIngestionSpec segmentMergeIngestionSpec, TaskToolbox tb) {
        return this.createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec);
    }

    private /* synthetic */ ParallelIndexTaskRunner lambda$runHashPartitionMultiPhaseParallel$6(List ioConfigs, ParallelIndexIngestionSpec segmentMergeIngestionSpec, TaskToolbox tb) {
        return this.createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec);
    }

    private static class PartitionReport {
        private final PartitionStat partitionStat;
        private final String subTaskId;

        PartitionReport(String subTaskId, PartitionStat partitionStat) {
            this.subTaskId = subTaskId;
            this.partitionStat = partitionStat;
        }

        String getSubTaskId() {
            return this.subTaskId;
        }

        PartitionStat getPartitionStat() {
            return this.partitionStat;
        }
    }

    static class Partition {
        final Interval interval;
        final int bucketId;

        private static Partition fromStat(PartitionStat partitionStat) {
            return new Partition(partitionStat.getInterval(), partitionStat.getBucketId());
        }

        Partition(Interval interval, int bucketId) {
            this.interval = interval;
            this.bucketId = bucketId;
        }

        public int getBucketId() {
            return this.bucketId;
        }

        public Interval getInterval() {
            return this.interval;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Partition that = (Partition)o;
            return this.getBucketId() == that.getBucketId() && Objects.equals(this.getInterval(), that.getInterval());
        }

        public int hashCode() {
            return Objects.hash(this.getInterval(), this.getBucketId());
        }

        public String toString() {
            return "Partition{interval=" + this.interval + ", bucketId=" + this.bucketId + '}';
        }
    }
}

