/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.NoopInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskLockHelper;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexingPhaseProgress;
import org.apache.druid.indexing.common.task.batch.parallel.PushedSegmentsReport;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelIndexSupervisorTaskTest {
    private static final int NUM_SUB_TASKS = 10;
    private final ConcurrentMap<String, SinglePhaseSubTaskSpec> subTaskSpecs = new ConcurrentHashMap<String, SinglePhaseSubTaskSpec>();
    private final ConcurrentMap<String, TaskStatusPlus> runningSpecs = new ConcurrentHashMap<String, TaskStatusPlus>();
    private final ConcurrentHashMap<String, List<TaskStatusPlus>> taskHistories = new ConcurrentHashMap();
    private final ConcurrentMap<String, SinglePhaseSubTaskSpec> taskIdToSpec = new ConcurrentHashMap<String, SinglePhaseSubTaskSpec>();
    private final CopyOnWriteArrayList<TestSubTask> runningTasks = new CopyOnWriteArrayList();
    private TestSupervisorTask task;

    public ParallelIndexSupervisorTaskResourceTest() {
        super(0.0, 0.0);
    }

    @After
    public void teardown() {
        this.temporaryFolder.delete();
    }

    @Test(timeout=20000L)
    public void testAPIs() throws Exception {
        int i;
        this.task = this.newTask(Intervals.of((String)"2017/2018"), new ParallelIndexIOConfig(null, (InputSource)new TestInputSource(IntStream.range(0, 10).boxed().collect(Collectors.toList())), (InputFormat)new NoopInputFormat(), Boolean.valueOf(false), null));
        this.getIndexingServiceClient().runTask(this.task.getId(), (Object)this.task);
        Thread.sleep(1000L);
        SinglePhaseParallelIndexTaskRunner runner = (SinglePhaseParallelIndexTaskRunner)this.task.getCurrentRunner();
        Assert.assertNotNull((String)"runner is null", (Object)runner);
        Response response = this.task.getMode(ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        Assert.assertEquals((Object)"parallel", (Object)response.getEntity());
        response = this.task.getProgress(ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        Assert.assertEquals((long)10L, (long)((ParallelIndexingPhaseProgress)response.getEntity()).getEstimatedExpectedSucceeded());
        while (this.getNumSubTasks((Function<ParallelIndexingPhaseProgress, Integer>)((Function)ParallelIndexingPhaseProgress::getRunning)) < 10) {
            Thread.sleep(100L);
        }
        int succeededTasks = 0;
        int failedTasks = 0;
        this.checkState(succeededTasks, failedTasks, this.buildStateMap());
        succeededTasks += 2;
        for (i = 0; i < succeededTasks; ++i) {
            this.runningTasks.get(0).setState(TaskState.SUCCESS);
        }
        while (this.getNumSubTasks((Function<ParallelIndexingPhaseProgress, Integer>)((Function)ParallelIndexingPhaseProgress::getSucceeded)) < succeededTasks) {
            Thread.sleep(100L);
        }
        this.checkState(succeededTasks, failedTasks, this.buildStateMap());
        failedTasks += 3;
        for (i = 0; i < failedTasks; ++i) {
            this.runningTasks.get(0).setState(TaskState.FAILED);
        }
        while (this.getNumSubTasks((Function<ParallelIndexingPhaseProgress, Integer>)((Function)ParallelIndexingPhaseProgress::getFailed)) < failedTasks || this.runningTasks.size() < 10 - succeededTasks) {
            Thread.sleep(100L);
        }
        this.checkState(succeededTasks, failedTasks, this.buildStateMap());
        succeededTasks += 7;
        for (i = 0; i < 7; ++i) {
            this.runningTasks.get(0).setState(TaskState.SUCCESS);
        }
        while (this.getNumSubTasks((Function<ParallelIndexingPhaseProgress, Integer>)((Function)ParallelIndexingPhaseProgress::getSucceeded)) < succeededTasks) {
            Thread.sleep(100L);
        }
        this.checkState(succeededTasks, failedTasks, this.buildStateMap());
        Assert.assertEquals((long)1L, (long)this.runningSpecs.size());
        String lastRunningSpecId = (String)this.runningSpecs.keySet().iterator().next();
        List<TaskStatusPlus> taskHistory = this.taskHistories.get(lastRunningSpecId);
        Assert.assertEquals((long)1L, (long)taskHistory.size());
        this.runningTasks.get(0).setState(TaskState.FAILED);
        ++failedTasks;
        while (this.getNumSubTasks((Function<ParallelIndexingPhaseProgress, Integer>)((Function)ParallelIndexingPhaseProgress::getFailed)) < failedTasks) {
            Thread.sleep(100L);
        }
        while (this.getNumSubTasks((Function<ParallelIndexingPhaseProgress, Integer>)((Function)ParallelIndexingPhaseProgress::getRunning)) < 1) {
            Thread.sleep(100L);
        }
        this.checkState(succeededTasks, failedTasks, this.buildStateMap());
        Assert.assertEquals((long)2L, (long)taskHistory.size());
        this.runningTasks.get(0).setState(TaskState.SUCCESS);
        ++succeededTasks;
        while (this.getNumSubTasks((Function<ParallelIndexingPhaseProgress, Integer>)((Function)ParallelIndexingPhaseProgress::getSucceeded)) < succeededTasks) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().waitToFinish((Task)this.task, 1000L, TimeUnit.MILLISECONDS).getStatusCode());
    }

    private int getNumSubTasks(Function<ParallelIndexingPhaseProgress, Integer> func) {
        Response response = this.task.getProgress(ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        return (Integer)func.apply((Object)((ParallelIndexingPhaseProgress)response.getEntity()));
    }

    private Map<String, ParallelIndexTaskRunner.SubTaskSpecStatus> buildStateMap() {
        HashMap<String, ParallelIndexTaskRunner.SubTaskSpecStatus> stateMap = new HashMap<String, ParallelIndexTaskRunner.SubTaskSpecStatus>();
        this.subTaskSpecs.forEach((specId, spec) -> {
            List<TaskStatusPlus> taskHistory = this.taskHistories.get(specId);
            TaskStatusPlus runningTaskStatus = (TaskStatusPlus)this.runningSpecs.get(specId);
            stateMap.put((String)specId, new ParallelIndexTaskRunner.SubTaskSpecStatus(spec, runningTaskStatus, taskHistory == null ? Collections.emptyList() : taskHistory));
        });
        return stateMap;
    }

    private void checkState(int expectedSucceededTasks, int expectedFailedTask, Map<String, ParallelIndexTaskRunner.SubTaskSpecStatus> expectedSubTaskStateResponses) {
        Response response = this.task.getProgress(ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        ParallelIndexingPhaseProgress monitorStatus = (ParallelIndexingPhaseProgress)response.getEntity();
        Assert.assertEquals((long)this.runningTasks.size(), (long)monitorStatus.getRunning());
        Assert.assertEquals((long)expectedSucceededTasks, (long)monitorStatus.getSucceeded());
        Assert.assertEquals((long)expectedFailedTask, (long)monitorStatus.getFailed());
        Assert.assertEquals((long)(expectedSucceededTasks + expectedFailedTask), (long)monitorStatus.getComplete());
        Assert.assertEquals((long)(this.runningTasks.size() + expectedSucceededTasks + expectedFailedTask), (long)monitorStatus.getTotal());
        response = this.task.getRunningTasks(ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        Assert.assertEquals(this.runningTasks.stream().map(AbstractTask::getId).collect(Collectors.toSet()), new HashSet((Collection)response.getEntity()));
        response = this.task.getSubTaskSpecs(ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        List actualSubTaskSpecMap = (List)response.getEntity();
        Assert.assertEquals(this.subTaskSpecs.keySet(), actualSubTaskSpecMap.stream().map(SubTaskSpec::getId).collect(Collectors.toSet()));
        response = this.task.getRunningSubTaskSpecs(ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        actualSubTaskSpecMap = (List)response.getEntity();
        Assert.assertEquals(this.runningSpecs.keySet(), actualSubTaskSpecMap.stream().map(SubTaskSpec::getId).collect(Collectors.toSet()));
        List completeSubTaskSpecs = expectedSubTaskStateResponses.entrySet().stream().filter(entry -> !this.runningSpecs.containsKey(entry.getKey())).map(entry -> ((ParallelIndexTaskRunner.SubTaskSpecStatus)entry.getValue()).getSpec()).collect(Collectors.toList());
        response = this.task.getCompleteSubTaskSpecs(ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        Assert.assertEquals(completeSubTaskSpecs, (Object)response.getEntity());
        String subTaskId = (String)this.runningSpecs.keySet().iterator().next();
        response = this.task.getSubTaskSpec(subTaskId, ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        SubTaskSpec subTaskSpec = (SubTaskSpec)response.getEntity();
        Assert.assertEquals((Object)subTaskId, (Object)subTaskSpec.getId());
        response = this.task.getSubTaskState(subTaskId, ParallelIndexSupervisorTaskResourceTest.newRequest());
        Assert.assertEquals((long)200L, (long)response.getStatus());
        ParallelIndexTaskRunner.SubTaskSpecStatus expectedResponse = (ParallelIndexTaskRunner.SubTaskSpecStatus)Preconditions.checkNotNull((Object)expectedSubTaskStateResponses.get(subTaskId), (String)"response for task[%s]", (Object[])new Object[]{subTaskId});
        ParallelIndexTaskRunner.SubTaskSpecStatus actualResponse = (ParallelIndexTaskRunner.SubTaskSpecStatus)response.getEntity();
        Assert.assertEquals((Object)expectedResponse.getSpec().getId(), (Object)actualResponse.getSpec().getId());
        Assert.assertEquals((Object)expectedResponse.getCurrentStatus(), (Object)actualResponse.getCurrentStatus());
        Assert.assertEquals((Object)expectedResponse.getTaskHistory(), (Object)actualResponse.getTaskHistory());
        String completeSubTaskSpecId = expectedSubTaskStateResponses.entrySet().stream().filter(entry -> {
            TaskStatusPlus currentStatus = ((ParallelIndexTaskRunner.SubTaskSpecStatus)entry.getValue()).getCurrentStatus();
            return currentStatus != null && (currentStatus.getStatusCode() == TaskState.SUCCESS || currentStatus.getStatusCode() == TaskState.FAILED);
        }).map(Map.Entry::getKey).findFirst().orElse(null);
        if (completeSubTaskSpecId != null) {
            response = this.task.getCompleteSubTaskSpecAttemptHistory(completeSubTaskSpecId, ParallelIndexSupervisorTaskResourceTest.newRequest());
            Assert.assertEquals((long)200L, (long)response.getStatus());
            Assert.assertEquals((Object)expectedSubTaskStateResponses.get(completeSubTaskSpecId).getTaskHistory(), (Object)response.getEntity());
        }
    }

    private static HttpServletRequest newRequest() {
        HttpServletRequest request = (HttpServletRequest)EasyMock.niceMock(HttpServletRequest.class);
        EasyMock.expect((Object)request.getAttribute("Druid-Authorization-Checked")).andReturn(null);
        EasyMock.expect((Object)request.getAttribute("Druid-Authentication-Result")).andReturn((Object)new AuthenticationResult("test", "test", "test", Collections.emptyMap()));
        EasyMock.replay((Object[])new Object[]{request});
        return request;
    }

    private TestSupervisorTask newTask(Interval interval, ParallelIndexIOConfig ioConfig) {
        ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec(new DataSchema("dataSource", DEFAULT_TIMESTAMP_SPEC, DEFAULT_DIMENSIONS_SPEC, new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval)), null), ioConfig, new ParallelIndexTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, Integer.valueOf(10), null, null, null, null, null, null, null, null, null, null, null, null));
        return new TestSupervisorTask(null, null, ingestionSpec, Collections.singletonMap("disableInject", true));
    }

    private class TestSubTask
    extends SinglePhaseSubTask {
        private volatile TaskState state;

        TestSubTask(String groupId, String supervisorTaskId, String subtaskSpecId, int numAttempts, ParallelIndexIngestionSpec ingestionSchema, Map<String, Object> context) {
            super(null, groupId, null, supervisorTaskId, subtaskSpecId, numAttempts, ingestionSchema, context);
            this.state = TaskState.RUNNING;
        }

        public TaskStatus run(TaskToolbox toolbox) throws Exception {
            while (this.state == TaskState.RUNNING) {
                Thread.sleep(100L);
            }
            ParallelIndexSupervisorTaskClient taskClient = (ParallelIndexSupervisorTaskClient)toolbox.getSupervisorTaskClientFactory().build(null, this.getId(), 0, null, 0L);
            DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec)this.getIngestionSchema().getTuningConfig().getGivenOrDefaultPartitionsSpec();
            SegmentAllocatorForBatch segmentAllocator = SegmentAllocators.forLinearPartitioning((TaskToolbox)toolbox, (String)this.getSubtaskSpecId(), (SupervisorTaskAccess)new SupervisorTaskAccess(this.getSupervisorTaskId(), taskClient), (DataSchema)this.getIngestionSchema().getDataSchema(), (TaskLockHelper)this.getTaskLockHelper(), (boolean)this.getIngestionSchema().getIOConfig().isAppendToExisting(), (PartitionsSpec)partitionsSpec, (Boolean)true);
            SegmentIdWithShardSpec segmentIdentifier = segmentAllocator.allocate((InputRow)new MapBasedInputRow(DateTimes.of((String)"2017-01-01"), Collections.emptyList(), Collections.emptyMap()), this.getSubtaskSpecId(), null, false);
            DataSegment segment = new DataSegment(segmentIdentifier.getDataSource(), segmentIdentifier.getInterval(), segmentIdentifier.getVersion(), null, null, null, segmentIdentifier.getShardSpec(), Integer.valueOf(0), 1L);
            taskClient.report(this.getSupervisorTaskId(), (SubTaskReport)new PushedSegmentsReport(this.getId(), Collections.emptySet(), Collections.singleton(segment), (Map)ImmutableMap.of()));
            return TaskStatus.fromCode((String)this.getId(), (TaskState)this.state);
        }

        void setState(TaskState state) {
            Preconditions.checkArgument((state == TaskState.SUCCESS || state == TaskState.FAILED ? 1 : 0) != 0, (String)"state[%s] should be SUCCESS of FAILED", (Object[])new Object[]{state});
            this.state = state;
            int taskIndex = IntStream.range(0, ParallelIndexSupervisorTaskResourceTest.this.runningTasks.size()).filter(i -> ((TestSubTask)((Object)((Object)ParallelIndexSupervisorTaskResourceTest.this.runningTasks.get(i)))).getId().equals(this.getId())).findAny().orElse(-1);
            if (taskIndex == -1) {
                throw new ISE("Can't find an index for task[%s]", new Object[]{this.getId()});
            }
            ParallelIndexSupervisorTaskResourceTest.this.runningTasks.remove(taskIndex);
            String specId = ((SinglePhaseSubTaskSpec)Preconditions.checkNotNull(ParallelIndexSupervisorTaskResourceTest.this.taskIdToSpec.get(this.getId()), (String)"spec for task[%s]", (Object[])new Object[]{this.getId()})).getId();
            ParallelIndexSupervisorTaskResourceTest.this.runningSpecs.remove(specId);
            ParallelIndexSupervisorTaskResourceTest.this.taskHistories.computeIfAbsent(specId, k -> new ArrayList()).add(new TaskStatusPlus(this.getId(), this.getGroupId(), this.getType(), DateTimes.EPOCH, DateTimes.EPOCH, state, RunnerTaskState.NONE, Long.valueOf(-1L), TaskLocation.unknown(), null, null));
        }
    }

    private class TestSubTaskSpec
    extends SinglePhaseSubTaskSpec {
        TestSubTaskSpec(String id, String groupId, ParallelIndexSupervisorTask supervisorTask, ParallelIndexIngestionSpec ingestionSpec, Map<String, Object> context, InputSplit inputSplit) {
            super(id, groupId, supervisorTask.getId(), ingestionSpec, context, inputSplit);
        }

        public SinglePhaseSubTask newSubTask(int numAttempts) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            TestSubTask subTask = new TestSubTask(this.getGroupId(), this.getSupervisorTaskId(), this.getId(), numAttempts, this.getIngestionSpec(), this.getContext());
            TestInputSource inputSource = (TestInputSource)this.getIngestionSpec().getIOConfig().getInputSource();
            InputSplit split = inputSource.createSplits(this.getIngestionSpec().getIOConfig().getInputFormat(), null).findFirst().orElse(null);
            if (split == null) {
                throw new ISE("Split is null", new Object[0]);
            }
            ParallelIndexSupervisorTaskResourceTest.this.runningTasks.add(subTask);
            ParallelIndexSupervisorTaskResourceTest.this.taskIdToSpec.put(subTask.getId(), this);
            ParallelIndexSupervisorTaskResourceTest.this.runningSpecs.put(this.getId(), new TaskStatusPlus(subTask.getId(), subTask.getGroupId(), subTask.getType(), DateTimes.EPOCH, DateTimes.EPOCH, TaskState.RUNNING, RunnerTaskState.RUNNING, Long.valueOf(-1L), TaskLocation.unknown(), null, null));
            return subTask;
        }
    }

    private class TestRunner
    extends SinglePhaseParallelIndexTaskRunner {
        private final ParallelIndexSupervisorTask supervisorTask;

        TestRunner(TaskToolbox toolbox, ParallelIndexSupervisorTask supervisorTask) {
            super(toolbox, supervisorTask.getId(), supervisorTask.getGroupId(), supervisorTask.getIngestionSchema(), supervisorTask.getContext());
            this.supervisorTask = supervisorTask;
        }

        SinglePhaseSubTaskSpec newTaskSpec(InputSplit split) {
            TestInputSource baseInputSource = (TestInputSource)this.getIngestionSchema().getIOConfig().getInputSource();
            TestSubTaskSpec spec = new TestSubTaskSpec(this.supervisorTask.getId() + "_" + this.getAndIncrementNextSpecId(), this.supervisorTask.getGroupId(), this.supervisorTask, new ParallelIndexIngestionSpec(this.getIngestionSchema().getDataSchema(), new ParallelIndexIOConfig(null, baseInputSource.withSplit((InputSplit<Integer>)split), this.getIngestionSchema().getIOConfig().getInputFormat(), Boolean.valueOf(this.getIngestionSchema().getIOConfig().isAppendToExisting()), null), this.getIngestionSchema().getTuningConfig()), this.supervisorTask.getContext(), split);
            ParallelIndexSupervisorTaskResourceTest.this.subTaskSpecs.put(spec.getId(), spec);
            return spec;
        }
    }

    private class TestSupervisorTask
    extends AbstractParallelIndexSupervisorTaskTest.TestParallelIndexSupervisorTask {
        TestSupervisorTask(String id, TaskResource taskResource, ParallelIndexIngestionSpec ingestionSchema, Map<String, Object> context) {
            super(id, taskResource, ingestionSchema, context);
        }

        SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolbox) {
            return new TestRunner(toolbox, this);
        }
    }

    private static class TestInputSource
    extends AbstractInputSource
    implements SplittableInputSource<Integer> {
        private final List<Integer> ids;

        TestInputSource(List<Integer> ids) {
            this.ids = ids;
        }

        public Stream<InputSplit<Integer>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) {
            return this.ids.stream().map(InputSplit::new);
        }

        public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) {
            return this.ids.size();
        }

        public SplittableInputSource<Integer> withSplit(InputSplit<Integer> split) {
            return new TestInputSource(Collections.singletonList(split.get()));
        }

        public boolean needsFormat() {
            return false;
        }
    }
}

