/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
import org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionLocation;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CompressionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase {
    static final String DISABLE_TASK_INJECT_CONTEXT_KEY = "disableInject";
    static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
    static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")));
    static final AggregatorFactory[] DEFAULT_METRICS_SPEC = new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")};
    static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, DEFAULT_DIMENSIONS_SPEC, null, Arrays.asList("ts", "dim", "val"), false, 0);
    static final InputFormat DEFAULT_INPUT_FORMAT = new CsvInputFormat(Arrays.asList("ts", "dim", "val"), null, Boolean.valueOf(false), Boolean.valueOf(false), 0);
    public static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = new ParallelIndexTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, Integer.valueOf(2), null, null, null, null, null, null, null, null, Integer.valueOf(5), null, null, null);
    protected static final double DEFAULT_TRANSIENT_TASK_FAILURE_RATE = 0.2;
    protected static final double DEFAULT_TRANSIENT_API_FAILURE_RATE = 0.2;
    private static final Logger LOG = new Logger(AbstractParallelIndexSupervisorTaskTest.class);
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    private final double transientTaskFailureRate;
    private final double transientApiCallFailureRate;
    private File localDeepStorage;
    private SimpleThreadingTaskRunner taskRunner;
    private ObjectMapper objectMapper;
    private LocalOverlordClient indexingServiceClient;
    private IntermediaryDataManager intermediaryDataManager;
    private CoordinatorClient coordinatorClient;
    private ExecutorService remoteApiExecutor;

    protected AbstractParallelIndexSupervisorTaskTest(double transientTaskFailureRate, double transientApiCallFailureRate) {
        this.transientTaskFailureRate = transientTaskFailureRate;
        this.transientApiCallFailureRate = transientApiCallFailureRate;
    }

    @Before
    public void setUpAbstractParallelIndexSupervisorTaskTest() throws IOException {
        this.localDeepStorage = this.temporaryFolder.newFolder("localStorage");
        this.taskRunner = new SimpleThreadingTaskRunner();
        this.objectMapper = this.getObjectMapper();
        this.indexingServiceClient = new LocalOverlordClient(this.objectMapper, this.taskRunner);
        this.intermediaryDataManager = new LocalIntermediaryDataManager(new WorkerConfig(), new TaskConfig(null, null, null, null, null, false, null, null, (List)ImmutableList.of((Object)new StorageLocationConfig(this.temporaryFolder.newFolder(), null, null)), false, false, TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), null), null);
        this.remoteApiExecutor = Execs.singleThreaded((String)"coordinator-api-executor");
        this.coordinatorClient = new LocalCoordinatorClient(this.remoteApiExecutor);
        this.prepareObjectMapper(this.objectMapper, this.getIndexIO());
    }

    @After
    public void tearDownAbstractParallelIndexSupervisorTaskTest() {
        this.remoteApiExecutor.shutdownNow();
        this.taskRunner.shutdown();
        this.temporaryFolder.delete();
    }

    protected ParallelIndexTuningConfig newTuningConfig(PartitionsSpec partitionsSpec, int maxNumConcurrentSubTasks, boolean forceGuaranteedRollup) {
        return new ParallelIndexTuningConfig(null, null, null, null, null, null, null, null, (SplitHintSpec)new MaxSizeSplitHintSpec(null, Integer.valueOf(1)), partitionsSpec, null, null, null, Boolean.valueOf(forceGuaranteedRollup), null, null, null, null, Integer.valueOf(maxNumConcurrentSubTasks), null, null, null, null, null, null, null, null, Integer.valueOf(5), null, null, null);
    }

    protected LocalOverlordClient getIndexingServiceClient() {
        return this.indexingServiceClient;
    }

    protected CoordinatorClient getCoordinatorClient() {
        return this.coordinatorClient;
    }

    public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO) {
        TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false, false, TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), null);
        objectMapper.setInjectableValues((InjectableValues)new InjectableValues.Std().addValue(ExprMacroTable.class, (Object)LookupEnabledTestExprMacroTable.INSTANCE).addValue(IndexIO.class, (Object)indexIO).addValue(ObjectMapper.class, (Object)objectMapper).addValue(ChatHandlerProvider.class, (Object)new NoopChatHandlerProvider()).addValue(AuthConfig.class, (Object)new AuthConfig()).addValue(AuthorizerMapper.class, null).addValue(RowIngestionMetersFactory.class, (Object)new DropwizardRowIngestionMetersFactory()).addValue(DataSegment.PruneSpecsHolder.class, (Object)DataSegment.PruneSpecsHolder.DEFAULT).addValue(AuthorizerMapper.class, (Object)new AuthorizerMapper((Map)ImmutableMap.of())).addValue(AppenderatorsManager.class, (Object)TestUtils.APPENDERATORS_MANAGER).addValue(LocalDataSegmentPuller.class, (Object)new LocalDataSegmentPuller()).addValue(CoordinatorClient.class, (Object)this.coordinatorClient).addValue(SegmentCacheManagerFactory.class, (Object)new SegmentCacheManagerFactory(objectMapper)).addValue(RetryPolicyFactory.class, (Object)new RetryPolicyFactory(new RetryPolicyConfig())).addValue(TaskConfig.class, (Object)taskConfig));
        objectMapper.registerSubtypes(new NamedType[]{new NamedType(ParallelIndexSupervisorTask.class, "index_parallel"), new NamedType(CompactionTask.CompactionTuningConfig.class, "compaction"), new NamedType(SinglePhaseSubTask.class, "single_phase_sub_task"), new NamedType(PartialHashSegmentGenerateTask.class, "partial_index_generate"), new NamedType(PartialRangeSegmentGenerateTask.class, "partial_range_index_generate"), new NamedType(PartialGenericSegmentMergeTask.class, "partial_index_generic_merge"), new NamedType(PartialDimensionDistributionTask.class, "partial_dimension_distribution"), new NamedType(PartialDimensionCardinalityTask.class, "partial_dimension_cardinality")});
    }

    protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException {
        return new TaskToolbox.Builder().config(new TaskConfig(null, null, null, null, null, false, null, null, null, false, false, TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), null)).taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, Integer.valueOf(8091), null, true, false)).taskActionClient(actionClient).segmentPusher((DataSegmentPusher)new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig(){

            public File getStorageDirectory() {
                return AbstractParallelIndexSupervisorTaskTest.this.localDeepStorage;
            }
        })).dataSegmentKiller((DataSegmentKiller)new NoopDataSegmentKiller()).joinableFactory((JoinableFactory)NoopJoinableFactory.INSTANCE).segmentCacheManager(this.newSegmentLoader(this.temporaryFolder.newFolder())).jsonMapper(this.objectMapper).taskWorkDir(this.temporaryFolder.newFolder(task.getId())).indexIO(this.getIndexIO()).indexMergerV9(this.getIndexMergerV9Factory().create(((Boolean)task.getContextValue("storeEmptyColumns", (Object)true)).booleanValue())).taskReportFileWriter((TaskReportFileWriter)new NoopTestTaskReportFileWriter()).intermediaryDataManager(this.intermediaryDataManager).authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER).chatHandlerProvider((ChatHandlerProvider)new NoopChatHandlerProvider()).rowIngestionMetersFactory(new TestUtils().getRowIngestionMetersFactory()).appenderatorsManager((AppenderatorsManager)new TestAppenderatorsManager()).overlordClient((OverlordClient)this.indexingServiceClient).coordinatorClient(this.coordinatorClient).supervisorTaskClientProvider((ParallelIndexSupervisorTaskClientProvider)new LocalParallelIndexTaskClientProvider(this.taskRunner, this.transientApiCallFailureRate)).shuffleClient((ShuffleClient)new LocalShuffleClient(this.intermediaryDataManager)).build();
    }

    protected Map<String, Object> buildExpectedTaskReportSequential(String taskId, List<ParseExceptionReport> expectedUnparseableEvents, RowIngestionMetersTotals expectedDeterminePartitions, RowIngestionMetersTotals expectedTotals) {
        HashMap<String, Object> payload = new HashMap<String, Object>();
        payload.put("ingestionState", IngestionState.COMPLETED);
        payload.put("unparseableEvents", ImmutableMap.of((Object)"determinePartitions", (Object)ImmutableList.of(), (Object)"buildSegments", expectedUnparseableEvents));
        ImmutableMap emptyAverageMinuteMap = ImmutableMap.of((Object)"processed", (Object)0.0, (Object)"unparseable", (Object)0.0, (Object)"thrownAway", (Object)0.0, (Object)"processedWithError", (Object)0.0);
        ImmutableMap emptyAverages = ImmutableMap.of((Object)"1m", (Object)emptyAverageMinuteMap, (Object)"5m", (Object)emptyAverageMinuteMap, (Object)"15m", (Object)emptyAverageMinuteMap);
        payload.put("rowStats", ImmutableMap.of((Object)"movingAverages", (Object)ImmutableMap.of((Object)"determinePartitions", (Object)emptyAverages, (Object)"buildSegments", (Object)emptyAverages), (Object)"totals", (Object)ImmutableMap.of((Object)"determinePartitions", (Object)expectedDeterminePartitions, (Object)"buildSegments", (Object)expectedTotals)));
        HashMap<String, Object> ingestionStatsAndErrors = new HashMap<String, Object>();
        ingestionStatsAndErrors.put("taskId", taskId);
        ingestionStatsAndErrors.put("payload", payload);
        ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
        return Collections.singletonMap("ingestionStatsAndErrors", ingestionStatsAndErrors);
    }

    protected Map<String, Object> buildExpectedTaskReportParallel(String taskId, List<ParseExceptionReport> expectedUnparseableEvents, RowIngestionMetersTotals expectedTotals) {
        HashMap<String, Object> returnMap = new HashMap<String, Object>();
        HashMap<String, Object> ingestionStatsAndErrors = new HashMap<String, Object>();
        HashMap<String, Object> payload = new HashMap<String, Object>();
        payload.put("ingestionState", IngestionState.COMPLETED);
        payload.put("unparseableEvents", ImmutableMap.of((Object)"buildSegments", expectedUnparseableEvents));
        payload.put("rowStats", ImmutableMap.of((Object)"totals", (Object)ImmutableMap.of((Object)"buildSegments", (Object)expectedTotals)));
        ingestionStatsAndErrors.put("taskId", taskId);
        ingestionStatsAndErrors.put("payload", payload);
        ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
        returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
        return returnMap;
    }

    protected void compareTaskReports(Map<String, Object> expectedReports, Map<String, Object> actualReports) {
        expectedReports = (Map)expectedReports.get("ingestionStatsAndErrors");
        actualReports = (Map)actualReports.get("ingestionStatsAndErrors");
        Assert.assertEquals(expectedReports.get("taskId"), actualReports.get("taskId"));
        Assert.assertEquals(expectedReports.get("type"), actualReports.get("type"));
        Map expectedPayload = (Map)expectedReports.get("payload");
        Map actualPayload = (Map)actualReports.get("payload");
        Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState"));
        Assert.assertEquals(expectedPayload.get("rowStats"), actualPayload.get("rowStats"));
        Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState"));
        List expectedParseExceptionReports = (List)((Map)expectedPayload.get("unparseableEvents")).get("buildSegments");
        List actualParseExceptionReports = (List)((Map)actualPayload.get("unparseableEvents")).get("buildSegments");
        List expectedMessages = expectedParseExceptionReports.stream().map(r -> (String)r.getDetails().get(0)).collect(Collectors.toList());
        List actualMessages = actualParseExceptionReports.stream().map(r -> (String)r.getDetails().get(0)).collect(Collectors.toList());
        Assert.assertEquals(expectedMessages, actualMessages);
        List expectedInputs = expectedParseExceptionReports.stream().map(ParseExceptionReport::getInput).collect(Collectors.toList());
        List actualInputs = actualParseExceptionReports.stream().map(ParseExceptionReport::getInput).collect(Collectors.toList());
        Assert.assertEquals(expectedInputs, actualInputs);
    }

    class LocalCoordinatorClient
    extends CoordinatorClient {
        private final ExecutorService exec;

        LocalCoordinatorClient(ExecutorService exec) {
            super(null, null);
            this.exec = exec;
        }

        public Collection<DataSegment> fetchUsedSegmentsInDataSourceForIntervals(String dataSource, List<Interval> intervals) {
            try {
                return this.exec.submit(() -> AbstractParallelIndexSupervisorTaskTest.this.getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE)).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        public DataSegment fetchUsedSegment(String dataSource, String segmentId) {
            ImmutableDruidDataSource druidDataSource;
            try {
                druidDataSource = this.exec.submit(() -> AbstractParallelIndexSupervisorTaskTest.this.getSegmentsMetadataManager().getImmutableDataSourceWithUsedSegments(dataSource)).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
            if (druidDataSource == null) {
                throw new ISE("Unknown datasource[%s]", new Object[]{dataSource});
            }
            for (SegmentId possibleSegmentId : SegmentId.iteratePossibleParsingsWithDataSource((String)dataSource, (String)segmentId)) {
                DataSegment segment = druidDataSource.getSegment(possibleSegmentId);
                if (segment == null) continue;
                return segment;
            }
            throw new ISE("Can't find segment for id[%s]", new Object[]{segmentId});
        }
    }

    static class LocalParallelIndexSupervisorTaskClient
    implements ParallelIndexSupervisorTaskClient {
        private static final int MAX_TRANSIENT_API_FAILURES = 3;
        private final String supervisorTaskId;
        private final double transientFailureRate;
        private final ConcurrentMap<String, TaskContainer> tasks;

        LocalParallelIndexSupervisorTaskClient(String supervisorTaskId, ConcurrentMap<String, TaskContainer> tasks, double transientFailureRate) {
            this.supervisorTaskId = supervisorTaskId;
            this.tasks = tasks;
            this.transientFailureRate = transientFailureRate;
        }

        public SegmentIdWithShardSpec allocateSegment(DateTime timestamp) throws IOException {
            TaskContainer taskContainer = (TaskContainer)this.tasks.get(this.supervisorTaskId);
            ParallelIndexSupervisorTask supervisorTask = this.findSupervisorTask(taskContainer);
            if (supervisorTask == null) {
                throw new ISE("Cannot find supervisor task for [%s]", new Object[]{this.supervisorTaskId});
            }
            if (!(supervisorTask.getCurrentRunner() instanceof SinglePhaseParallelIndexTaskRunner)) {
                throw new ISE("Only SinglePhaseParallelIndexTaskRunner can call this API", new Object[0]);
            }
            SinglePhaseParallelIndexTaskRunner runner = (SinglePhaseParallelIndexTaskRunner)supervisorTask.getCurrentRunner();
            return runner.allocateNewSegment(supervisorTask.getDataSource(), timestamp);
        }

        public SegmentIdWithShardSpec allocateSegment(DateTime timestamp, String sequenceName, @Nullable String prevSegmentId) throws IOException {
            TaskContainer taskContainer = (TaskContainer)this.tasks.get(this.supervisorTaskId);
            ParallelIndexSupervisorTask supervisorTask = this.findSupervisorTask(taskContainer);
            if (supervisorTask == null) {
                throw new ISE("Cannot find supervisor task for [%s]", new Object[]{this.supervisorTaskId});
            }
            if (!(supervisorTask.getCurrentRunner() instanceof SinglePhaseParallelIndexTaskRunner)) {
                throw new ISE("Only SinglePhaseParallelIndexTaskRunner can call this API", new Object[0]);
            }
            SinglePhaseParallelIndexTaskRunner runner = (SinglePhaseParallelIndexTaskRunner)supervisorTask.getCurrentRunner();
            SegmentIdWithShardSpec newSegmentId = null;
            int i = 0;
            do {
                SegmentIdWithShardSpec allocated = runner.allocateNewSegment(supervisorTask.getDataSource(), timestamp, sequenceName, prevSegmentId);
                if (newSegmentId == null) {
                    newSegmentId = allocated;
                }
                if (newSegmentId.equals((Object)allocated)) continue;
                throw new ISE("Segment allocation is not idempotent. Prev id was [%s] but new id is [%s]", new Object[]{newSegmentId, allocated});
            } while (i++ < 3 && ThreadLocalRandom.current().nextDouble() < this.transientFailureRate);
            return newSegmentId;
        }

        public void report(SubTaskReport report) {
            TaskContainer taskContainer = (TaskContainer)this.tasks.get(this.supervisorTaskId);
            ParallelIndexSupervisorTask supervisorTask = this.findSupervisorTask(taskContainer);
            if (supervisorTask == null) {
                throw new ISE("Cannot find supervisor task for [%s]", new Object[]{this.supervisorTaskId});
            }
            int i = 0;
            do {
                supervisorTask.getCurrentRunner().collectReport(report);
            } while (i++ < 3 && ThreadLocalRandom.current().nextDouble() < this.transientFailureRate);
        }

        @Nullable
        private ParallelIndexSupervisorTask findSupervisorTask(TaskContainer taskContainer) {
            if (taskContainer == null) {
                return null;
            }
            if (taskContainer.task instanceof CompactionTask) {
                Task task = (Task)((CompactionTask)taskContainer.task).getCurrentSubTaskHolder().getTask();
                if (!(task instanceof ParallelIndexSupervisorTask)) {
                    return null;
                }
                return (ParallelIndexSupervisorTask)task;
            }
            if (!(taskContainer.task instanceof ParallelIndexSupervisorTask)) {
                return null;
            }
            return (ParallelIndexSupervisorTask)taskContainer.task;
        }
    }

    static class LocalParallelIndexTaskClientProvider
    implements ParallelIndexSupervisorTaskClientProvider {
        private final ConcurrentMap<String, TaskContainer> tasks;
        private final double transientApiCallFailureRate;

        LocalParallelIndexTaskClientProvider(SimpleThreadingTaskRunner taskRunner, double transientApiCallFailureRate) {
            this.tasks = taskRunner.tasks;
            this.transientApiCallFailureRate = transientApiCallFailureRate;
        }

        public ParallelIndexSupervisorTaskClient build(String supervisorTaskId, Duration httpTimeout, long numRetries) {
            return new LocalParallelIndexSupervisorTaskClient(supervisorTaskId, this.tasks, this.transientApiCallFailureRate);
        }
    }

    static class LocalShuffleClient
    implements ShuffleClient<GenericPartitionLocation> {
        private final IntermediaryDataManager intermediaryDataManager;

        LocalShuffleClient(IntermediaryDataManager intermediaryDataManager) {
            this.intermediaryDataManager = intermediaryDataManager;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public File fetchSegmentFile(File partitionDir, String supervisorTaskId, GenericPartitionLocation location) throws IOException {
            Optional zippedFile = this.intermediaryDataManager.findPartitionFile(supervisorTaskId, location.getSubTaskId(), location.getInterval(), location.getBucketId());
            if (!zippedFile.isPresent()) {
                throw new ISE("Can't find segment file for location[%s] at path[%s]", new Object[]{location});
            }
            File fetchedFile = new File(partitionDir, StringUtils.format((String)"temp_%s", (Object[])new Object[]{location.getSubTaskId()}));
            FileUtils.writeAtomically((File)fetchedFile, out -> ((ByteSource)zippedFile.get()).copyTo(out));
            File unzippedDir = new File(partitionDir, StringUtils.format((String)"unzipped_%s", (Object[])new Object[]{location.getSubTaskId()}));
            try {
                FileUtils.mkdirp((File)unzippedDir);
                CompressionUtils.unzip((File)fetchedFile, (File)unzippedDir);
                if (fetchedFile.delete()) return unzippedDir;
            }
            catch (Throwable throwable) {
                if (fetchedFile.delete()) throw throwable;
                LOG.warn("Failed to delete temp file[%s]", new Object[]{zippedFile});
                throw throwable;
            }
            LOG.warn("Failed to delete temp file[%s]", new Object[]{zippedFile});
            return unzippedDir;
        }
    }

    static class TestParallelIndexSupervisorTask
    extends ParallelIndexSupervisorTask {
        TestParallelIndexSupervisorTask(String id, TaskResource taskResource, ParallelIndexIngestionSpec ingestionSchema, Map<String, Object> context) {
            super(id, null, taskResource, ingestionSchema, context);
        }
    }

    public class LocalOverlordClient
    extends NoopOverlordClient {
        private final ObjectMapper objectMapper;
        private final SimpleThreadingTaskRunner taskRunner;

        public LocalOverlordClient(ObjectMapper objectMapper, SimpleThreadingTaskRunner taskRunner) {
            this.objectMapper = objectMapper;
            this.taskRunner = taskRunner;
        }

        public ListenableFuture<Void> runTask(String taskId, Object taskObject) {
            Task task = (Task)taskObject;
            this.taskRunner.run(this.injectIfNeeded(task));
            return Futures.immediateFuture(null);
        }

        public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId) {
            com.google.common.base.Optional task = AbstractParallelIndexSupervisorTaskTest.this.getTaskStorage().getTask(taskId);
            if (!task.isPresent()) {
                return null;
            }
            return Futures.immediateFuture((Object)((ParallelIndexSupervisorTask)task.get()).doGetLiveReports("full"));
        }

        public TaskContainer getTaskContainer(String taskId) {
            return this.taskRunner.getTaskContainer(taskId);
        }

        public TaskStatus runAndWait(Task task) {
            return this.taskRunner.runAndWait(this.injectIfNeeded(task));
        }

        public TaskStatus waitToFinish(Task task, long timeout, TimeUnit timeUnit) {
            return this.taskRunner.waitToFinish(task, timeout, timeUnit);
        }

        private Task injectIfNeeded(Task task) {
            if (!((Boolean)task.getContextValue(AbstractParallelIndexSupervisorTaskTest.DISABLE_TASK_INJECT_CONTEXT_KEY, (Object)false)).booleanValue()) {
                try {
                    byte[] json = this.objectMapper.writeValueAsBytes((Object)task);
                    return (Task)this.objectMapper.readValue(json, Task.class);
                }
                catch (IOException e) {
                    LOG.error((Throwable)e, "Error while serializing and deserializing task spec", new Object[0]);
                    throw new RuntimeException(e);
                }
            }
            return task;
        }

        public ListenableFuture<Void> cancelTask(String taskId) {
            this.taskRunner.cancel(taskId);
            return Futures.immediateFuture(null);
        }

        public ListenableFuture<TaskStatusResponse> taskStatus(String taskId) {
            com.google.common.base.Optional task = AbstractParallelIndexSupervisorTaskTest.this.getTaskStorage().getTask(taskId);
            String groupId = task.isPresent() ? ((Task)task.get()).getGroupId() : null;
            String taskType = task.isPresent() ? ((Task)task.get()).getType() : null;
            TaskStatus taskStatus = this.taskRunner.getStatus(taskId);
            if (taskStatus != null) {
                TaskStatusResponse retVal = new TaskStatusResponse(taskId, new TaskStatusPlus(taskId, groupId, taskType, DateTimes.EPOCH, DateTimes.EPOCH, taskStatus.getStatusCode(), taskStatus.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.RUNNING, Long.valueOf(-1L), TaskLocation.unknown(), null, null));
                return Futures.immediateFuture((Object)retVal);
            }
            return Futures.immediateFuture((Object)new TaskStatusResponse(taskId, null));
        }

        public Set<DataSegment> getPublishedSegments(Task task) {
            return this.taskRunner.getPublishedSegments(task.getId());
        }
    }

    public class SimpleThreadingTaskRunner {
        private final ConcurrentMap<String, TaskContainer> tasks = new ConcurrentHashMap<String, TaskContainer>();
        private final ListeningExecutorService service = MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)5, (String)"simple-threading-task-runner-%d"));
        private final ScheduledExecutorService taskKiller = Execs.scheduledSingleThreaded((String)"simple-threading-task-killer");
        private final Set<String> killedSubtaskSpecs = new HashSet<String>();

        SimpleThreadingTaskRunner() {
            this.taskKiller.scheduleAtFixedRate(() -> {
                for (TaskContainer taskContainer : this.tasks.values()) {
                    String subtaskSpecId;
                    boolean kill = ThreadLocalRandom.current().nextDouble() < AbstractParallelIndexSupervisorTaskTest.this.transientTaskFailureRate;
                    if (!kill || taskContainer.statusFuture.isDone() || (subtaskSpecId = taskContainer.task instanceof AbstractBatchSubtask ? ((AbstractBatchSubtask)taskContainer.task).getSubtaskSpecId() : null) == null || this.killedSubtaskSpecs.contains(subtaskSpecId)) continue;
                    this.killedSubtaskSpecs.add(subtaskSpecId);
                    taskContainer.statusFuture.cancel(true);
                    LOG.info("Transient task failure test. Killed task[%s] for spec[%s]", new Object[]{taskContainer.task.getId(), subtaskSpecId});
                }
            }, 100L, 100L, TimeUnit.MILLISECONDS);
        }

        public void shutdown() {
            this.service.shutdownNow();
            this.taskKiller.shutdownNow();
        }

        public String run(Task task) {
            this.runTask(task);
            return task.getId();
        }

        private TaskStatus runAndWait(Task task) {
            try {
                return this.runTask(task).get(20L, TimeUnit.MINUTES);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (ExecutionException | TimeoutException e) {
                throw new RuntimeException(e);
            }
        }

        private TaskStatus waitToFinish(Task task, long waitTime, TimeUnit timeUnit) {
            TaskContainer taskContainer = (TaskContainer)this.tasks.get(task.getId());
            if (taskContainer == null) {
                throw new IAE("Unknown task[%s]", new Object[]{task.getId()});
            }
            try {
                while (taskContainer.statusFuture == null && !Thread.currentThread().isInterrupted()) {
                    Thread.sleep(10L);
                }
                return (TaskStatus)taskContainer.statusFuture.get(waitTime, timeUnit);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (ExecutionException | TimeoutException e) {
                throw new RuntimeException(e);
            }
        }

        private TaskContainer getTaskContainer(String taskId) {
            return (TaskContainer)this.tasks.get(taskId);
        }

        private Future<TaskStatus> runTask(Task task) {
            TaskContainer taskContainer = new TaskContainer(task);
            if (this.tasks.put(task.getId(), taskContainer) != null) {
                throw new ISE("Duplicate task ID[%s]", new Object[]{task.getId()});
            }
            try {
                AbstractParallelIndexSupervisorTaskTest.this.prepareTaskForLocking(task);
            }
            catch (EntryExistsException e) {
                throw new RuntimeException(e);
            }
            task.addToContextIfAbsent("useLineageBasedSegmentAllocation", (Object)true);
            ListenableFuture statusFuture = this.service.submit(() -> {
                try {
                    IngestionTestBase.TestLocalTaskActionClient actionClient = AbstractParallelIndexSupervisorTaskTest.this.createActionClient(task);
                    TaskToolbox toolbox = AbstractParallelIndexSupervisorTaskTest.this.createTaskToolbox(task, actionClient);
                    taskContainer.setActionClient(actionClient);
                    if (task.isReady(toolbox.getTaskActionClient())) {
                        return task.run(toolbox);
                    }
                    AbstractParallelIndexSupervisorTaskTest.this.getTaskStorage().setStatus(TaskStatus.failure((String)task.getId(), (String)"Dummy task status failure for testing"));
                    throw new ISE("task[%s] is not ready", new Object[]{task.getId()});
                }
                catch (Exception e) {
                    AbstractParallelIndexSupervisorTaskTest.this.getTaskStorage().setStatus(TaskStatus.failure((String)task.getId(), (String)e.getMessage()));
                    throw new RuntimeException(e);
                }
            });
            taskContainer.setStatusFuture((Future)statusFuture);
            ListenableFuture cleanupFuture = Futures.transform((ListenableFuture)statusFuture, status -> {
                AbstractParallelIndexSupervisorTaskTest.this.shutdownTask(task);
                return status;
            });
            return cleanupFuture;
        }

        @Nullable
        public String cancel(String taskId) {
            TaskContainer taskContainer = (TaskContainer)this.tasks.remove(taskId);
            if (taskContainer != null && taskContainer.statusFuture != null) {
                taskContainer.statusFuture.cancel(true);
                return taskId;
            }
            return null;
        }

        @Nullable
        public TaskStatus getStatus(String taskId) {
            TaskContainer taskContainer = (TaskContainer)this.tasks.get(taskId);
            if (taskContainer != null && taskContainer.statusFuture != null) {
                try {
                    if (taskContainer.statusFuture.isDone()) {
                        return (TaskStatus)taskContainer.statusFuture.get();
                    }
                    return TaskStatus.running((String)taskId);
                }
                catch (InterruptedException | CancellationException | ExecutionException e) {
                    LOG.error((Throwable)e, "Task[%s] failed", new Object[]{taskId});
                    return TaskStatus.failure((String)taskId, (String)e.getMessage());
                }
            }
            return null;
        }

        public Set<DataSegment> getPublishedSegments(String taskId) {
            TaskContainer taskContainer = (TaskContainer)this.tasks.get(taskId);
            if (taskContainer == null || taskContainer.actionClient == null) {
                return Collections.emptySet();
            }
            return taskContainer.actionClient.getPublishedSegments();
        }
    }

    protected static class TaskContainer {
        private final Task task;
        private volatile @MonotonicNonNull Future<TaskStatus> statusFuture;
        private volatile @MonotonicNonNull IngestionTestBase.TestLocalTaskActionClient actionClient;

        private TaskContainer(Task task) {
            this.task = task;
        }

        public Task getTask() {
            return this.task;
        }

        private void setStatusFuture(Future<TaskStatus> statusFuture) {
            this.statusFuture = statusFuture;
        }

        private void setActionClient(IngestionTestBase.TestLocalTaskActionClient actionClient) {
            this.actionClient = actionClient;
        }
    }
}

