/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTaskSpec;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SinglePhaseParallelIndexingTest
extends AbstractParallelIndexSupervisorTaskTest {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private static final Interval INTERVAL_TO_INDEX = Intervals.of((String)"2017-12/P1M");
    private static final String VALID_INPUT_SOURCE_FILTER = "test_*";
    private final LockGranularity lockGranularity;
    private final boolean useInputFormatApi;
    private File inputDir;

    @Parameterized.Parameters(name="{0}, useInputFormatApi={1}")
    public static Iterable<Object[]> constructorFeeder() {
        return ImmutableList.of((Object)new Object[]{LockGranularity.TIME_CHUNK, false}, (Object)new Object[]{LockGranularity.TIME_CHUNK, true}, (Object)new Object[]{LockGranularity.SEGMENT, true});
    }

    public SinglePhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi) {
        super(0.2, 0.2);
        this.lockGranularity = lockGranularity;
        this.useInputFormatApi = useInputFormatApi;
    }

    @Before
    public void setup() throws IOException {
        Throwable throwable;
        BufferedWriter writer;
        int i;
        this.inputDir = this.temporaryFolder.newFolder("data");
        for (i = 0; i < 5; ++i) {
            writer = Files.newBufferedWriter(new File(this.inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            throwable = null;
            try {
                writer.write(StringUtils.format((String)"2017-12-%d,%d th test file\n", (Object[])new Object[]{24 + i, i}));
                writer.write(StringUtils.format((String)"2017-12-%d,%d th test file\n", (Object[])new Object[]{25 + i, i}));
                if (i != 0) continue;
                writer.write(StringUtils.format((String)"2012-12-%d,%d th test file\n", (Object[])new Object[]{25 + i, i}));
                writer.write(StringUtils.format((String)"2017-12-%d,%d th test file,badval\n", (Object[])new Object[]{25 + i, i}));
                writer.write(StringUtils.format((String)"2017unparseable\n", (Object[])new Object[0]));
                continue;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (writer != null) {
                    if (throwable != null) {
                        try {
                            ((Writer)writer).close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        ((Writer)writer).close();
                    }
                }
            }
        }
        for (i = 0; i < 5; ++i) {
            writer = Files.newBufferedWriter(new File(this.inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            throwable = null;
            try {
                writer.write(StringUtils.format((String)"2017-12-%d,%d th test file\n", (Object[])new Object[]{25 + i, i}));
                continue;
            }
            catch (Throwable throwable4) {
                throwable = throwable4;
                throw throwable4;
            }
            finally {
                if (writer != null) {
                    if (throwable != null) {
                        try {
                            ((Writer)writer).close();
                        }
                        catch (Throwable throwable5) {
                            throwable.addSuppressed(throwable5);
                        }
                    } else {
                        ((Writer)writer).close();
                    }
                }
            }
        }
        this.getObjectMapper().registerSubtypes(new Class[]{SettableSplittableLocalInputSource.class});
    }

    @After
    public void teardown() {
        this.temporaryFolder.delete();
    }

    @Test
    public void testIsReady() throws Exception {
        ParallelIndexSupervisorTask task = this.newTask(INTERVAL_TO_INDEX, false, true);
        IngestionTestBase.TestLocalTaskActionClient actionClient = this.createActionClient((Task)task);
        TaskToolbox toolbox = this.createTaskToolbox((Task)task, actionClient);
        this.prepareTaskForLocking((Task)task);
        Assert.assertTrue((boolean)task.isReady((TaskActionClient)actionClient));
        SinglePhaseParallelIndexTaskRunner runner = task.createSinglePhaseTaskRunner(toolbox);
        Iterator subTaskSpecIterator = runner.subTaskSpecIterator();
        while (subTaskSpecIterator.hasNext()) {
            SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec)subTaskSpecIterator.next();
            SinglePhaseSubTask subTask = new SinglePhaseSubTask(null, spec.getGroupId(), null, spec.getSupervisorTaskId(), spec.getId(), 0, spec.getIngestionSpec(), spec.getContext());
            IngestionTestBase.TestLocalTaskActionClient subTaskActionClient = this.createActionClient((Task)subTask);
            this.prepareTaskForLocking((Task)subTask);
            Assert.assertTrue((boolean)subTask.isReady((TaskActionClient)subTaskActionClient));
        }
    }

    private ParallelIndexSupervisorTask runTestTask(@Nullable Interval interval, Granularity segmentGranularity, boolean appendToExisting, Collection<DataSegment> originalSegmentsIfAppend) {
        ParallelIndexSupervisorTask task = this.newTask(interval, segmentGranularity, appendToExisting, true);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
        this.assertShardSpec(task, interval == null ? LockGranularity.TIME_CHUNK : this.lockGranularity, appendToExisting, originalSegmentsIfAppend);
        AbstractParallelIndexSupervisorTaskTest.TaskContainer taskContainer = this.getIndexingServiceClient().getTaskContainer(task.getId());
        return (ParallelIndexSupervisorTask)taskContainer.getTask();
    }

    private ParallelIndexSupervisorTask runOverwriteTask(@Nullable Interval interval, Granularity segmentGranularity, LockGranularity actualLockGranularity) {
        ParallelIndexSupervisorTask task = this.newTask(interval, segmentGranularity, false, true);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
        this.assertShardSpecAfterOverwrite(task, actualLockGranularity);
        AbstractParallelIndexSupervisorTaskTest.TaskContainer taskContainer = this.getIndexingServiceClient().getTaskContainer(task.getId());
        return (ParallelIndexSupervisorTask)taskContainer.getTask();
    }

    private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity secondSegmentGranularity) {
        this.runTestTask(inputInterval, Granularities.DAY, false, Collections.emptyList());
        HashSet allSegments = new HashSet(inputInterval == null ? this.getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE) : this.getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE));
        LockGranularity actualLockGranularity = inputInterval == null ? LockGranularity.TIME_CHUNK : (secondSegmentGranularity.equals(Granularities.DAY) ? this.lockGranularity : LockGranularity.TIME_CHUNK);
        this.runOverwriteTask(inputInterval, secondSegmentGranularity, actualLockGranularity);
        Collection newSegments = inputInterval == null ? this.getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE) : this.getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE);
        Assert.assertFalse((boolean)newSegments.isEmpty());
        allSegments.addAll(newSegments);
        SegmentTimeline timeline = SegmentTimeline.forSegments(allSegments);
        Interval timelineInterval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
        Set visibles = timeline.findNonOvershadowedObjectsInInterval(timelineInterval, Partitions.ONLY_COMPLETE);
        Assert.assertEquals(new HashSet(newSegments), (Object)visibles);
    }

    private void assertShardSpec(ParallelIndexSupervisorTask task, LockGranularity actualLockGranularity, boolean appendToExisting, Collection<DataSegment> originalSegmentsIfAppend) {
        Set<DataSegment> segments = this.getIndexingServiceClient().getPublishedSegments((Task)task);
        if (!appendToExisting && actualLockGranularity == LockGranularity.TIME_CHUNK) {
            Map intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
            for (List segmentsPerInterval : intervalToSegments.values()) {
                for (DataSegment segment : segmentsPerInterval) {
                    Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
                    NumberedShardSpec shardSpec = (NumberedShardSpec)segment.getShardSpec();
                    Assert.assertEquals((long)segmentsPerInterval.size(), (long)shardSpec.getNumCorePartitions());
                }
            }
        } else {
            Map intervalToOriginalSegments = SegmentUtils.groupSegmentsByInterval(originalSegmentsIfAppend);
            for (DataSegment segment : segments) {
                Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
                NumberedShardSpec shardSpec = (NumberedShardSpec)segment.getShardSpec();
                List originalSegmentsInInterval = (List)intervalToOriginalSegments.get(segment.getInterval());
                int expectedNumCorePartitions = originalSegmentsInInterval == null || originalSegmentsInInterval.isEmpty() ? 0 : ((DataSegment)originalSegmentsInInterval.get(0)).getShardSpec().getNumCorePartitions();
                Assert.assertEquals((long)expectedNumCorePartitions, (long)shardSpec.getNumCorePartitions());
            }
        }
    }

    private void assertShardSpecAfterOverwrite(ParallelIndexSupervisorTask task, LockGranularity actualLockGranularity) {
        Set<DataSegment> segments = this.getIndexingServiceClient().getPublishedSegments((Task)task);
        Map intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
        if (actualLockGranularity != LockGranularity.SEGMENT) {
            for (List segmentsPerInterval : intervalToSegments.values()) {
                for (DataSegment segment : segmentsPerInterval) {
                    Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
                    NumberedShardSpec shardSpec = (NumberedShardSpec)segment.getShardSpec();
                    Assert.assertEquals((long)segmentsPerInterval.size(), (long)shardSpec.getNumCorePartitions());
                }
            }
        } else {
            for (List segmentsPerInterval : intervalToSegments.values()) {
                for (DataSegment segment : segmentsPerInterval) {
                    Assert.assertSame(NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass());
                    NumberedOverwriteShardSpec shardSpec = (NumberedOverwriteShardSpec)segment.getShardSpec();
                    Assert.assertEquals((long)segmentsPerInterval.size(), (long)shardSpec.getAtomicUpdateGroupSize());
                }
            }
        }
    }

    @Test
    public void testWithoutInterval() {
        this.testRunAndOverwrite(null, Granularities.DAY);
    }

    @Test
    public void testRunInParallel() {
        this.testRunAndOverwrite(Intervals.of((String)"2017-12/P1M"), Granularities.DAY);
    }

    @Test
    public void testRunInParallelIngestNullColumn() {
        if (!this.useInputFormatApi) {
            return;
        }
        List dimensionSchemas = DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim", "dim"));
        ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(null, null, null, new ParallelIndexIngestionSpec(new DataSchema("dataSource", DEFAULT_TIMESTAMP_SPEC, DEFAULT_DIMENSIONS_SPEC.withDimensions(dimensionSchemas), new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, Collections.singletonList(Intervals.of((String)"2017-12/P1M"))), null), new ParallelIndexIOConfig(null, (InputSource)new SettableSplittableLocalInputSource(this.inputDir, VALID_INPUT_SOURCE_FILTER, true), DEFAULT_INPUT_FORMAT, Boolean.valueOf(false), null), DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING), null);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
        Set<DataSegment> segments = this.getIndexingServiceClient().getPublishedSegments((Task)task);
        for (DataSegment segment : segments) {
            for (int i = 0; i < dimensionSchemas.size(); ++i) {
                Assert.assertEquals((Object)((DimensionSchema)dimensionSchemas.get(i)).getName(), segment.getDimensions().get(i));
            }
        }
    }

    @Test
    public void testRunInParallelIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() {
        if (!this.useInputFormatApi) {
            return;
        }
        List dimensionSchemas = DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim", "dim"));
        ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(null, null, null, new ParallelIndexIngestionSpec(new DataSchema("dataSource", DEFAULT_TIMESTAMP_SPEC, DEFAULT_DIMENSIONS_SPEC.withDimensions(dimensionSchemas), new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, Collections.singletonList(Intervals.of((String)"2017-12/P1M"))), null), new ParallelIndexIOConfig(null, (InputSource)new SettableSplittableLocalInputSource(this.inputDir, VALID_INPUT_SOURCE_FILTER, true), DEFAULT_INPUT_FORMAT, Boolean.valueOf(false), null), DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING), null);
        task.addToContext("storeEmptyColumns", (Object)false);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
        Set<DataSegment> segments = this.getIndexingServiceClient().getPublishedSegments((Task)task);
        for (DataSegment segment : segments) {
            Assert.assertFalse((boolean)segment.getDimensions().contains("unknownDim"));
        }
    }

    @Test
    public void testRunInParallelTaskReports() {
        ParallelIndexSupervisorTask task = this.runTestTask(Intervals.of((String)"2017-12/P1M"), Granularities.DAY, false, Collections.emptyList());
        Map actualReports = task.doGetLiveReports("full");
        Map<String, Object> expectedReports = this.buildExpectedTaskReportParallel(task.getId(), (List<ParseExceptionReport>)ImmutableList.of((Object)new ParseExceptionReport("{ts=2017unparseable}", "unparseable", (List)ImmutableList.of((Object)this.getErrorMessageForUnparseableTimestamp()), 1L), (Object)new ParseExceptionReport("{ts=2017-12-25, dim=0 th test file, val=badval}", "processedWithError", (List)ImmutableList.of((Object)"Unable to parse value[badval] for field[val]"), 1L)), new RowIngestionMetersTotals(10L, 1L, 1L, 1L));
        this.compareTaskReports(expectedReports, actualReports);
    }

    @Test
    public void testWithoutIntervalWithDifferentSegmentGranularity() {
        this.testRunAndOverwrite(null, Granularities.MONTH);
    }

    @Test
    public void testRunInParallelWithDifferentSegmentGranularity() {
        this.testRunAndOverwrite(Intervals.of((String)"2017-12/P1M"), Granularities.MONTH);
    }

    @Test
    public void testRunInSequential() {
        Interval interval = Intervals.of((String)"2017-12/P1M");
        boolean appendToExisting = false;
        ParallelIndexSupervisorTask task = this.newTask(interval, false, false);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
        this.assertShardSpec(task, this.lockGranularity, false, Collections.emptyList());
        AbstractParallelIndexSupervisorTaskTest.TaskContainer taskContainer = this.getIndexingServiceClient().getTaskContainer(task.getId());
        ParallelIndexSupervisorTask executedTask = (ParallelIndexSupervisorTask)taskContainer.getTask();
        Map actualReports = executedTask.doGetLiveReports("full");
        RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10L, 1L, 1L, 1L);
        ImmutableList expectedUnparseableEvents = ImmutableList.of((Object)new ParseExceptionReport("{ts=2017unparseable}", "unparseable", (List)ImmutableList.of((Object)this.getErrorMessageForUnparseableTimestamp()), 1L), (Object)new ParseExceptionReport("{ts=2017-12-25, dim=0 th test file, val=badval}", "processedWithError", (List)ImmutableList.of((Object)"Unable to parse value[badval] for field[val]"), 1L));
        Map<String, Object> expectedReports = this.useInputFormatApi ? this.buildExpectedTaskReportSequential(task.getId(), (List<ParseExceptionReport>)expectedUnparseableEvents, new RowIngestionMetersTotals(0L, 0L, 0L, 0L), expectedTotals) : this.buildExpectedTaskReportParallel(task.getId(), (List<ParseExceptionReport>)expectedUnparseableEvents, expectedTotals);
        this.compareTaskReports(expectedReports, actualReports);
        System.out.println(actualReports);
    }

    @Test
    public void testPublishEmptySegments() {
        ParallelIndexSupervisorTask task = this.newTask(Intervals.of((String)"2020-12/P1M"), false, true);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
    }

    @Test
    public void testWith1MaxNumConcurrentSubTasks() {
        Interval interval = Intervals.of((String)"2017-12/P1M");
        boolean appendToExisting = false;
        ParallelIndexSupervisorTask task = this.newTask(interval, Granularities.DAY, false, true, new ParallelIndexTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, Integer.valueOf(1), null, null, null, null, null, null, null, null, null, null, null, null), VALID_INPUT_SOURCE_FILTER);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
        Assert.assertNull((String)"Runner must be null if the task was in the sequential mode", (Object)task.getCurrentRunner());
        this.assertShardSpec(task, this.lockGranularity, false, Collections.emptyList());
    }

    @Test
    public void testAppendToExisting() {
        Interval interval = Intervals.of((String)"2017-12/P1M");
        this.runTestTask(interval, Granularities.DAY, true, Collections.emptyList());
        Collection oldSegments = this.getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
        this.runTestTask(interval, Granularities.DAY, true, oldSegments);
        Collection newSegments = this.getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
        Assert.assertTrue((boolean)newSegments.containsAll(oldSegments));
        SegmentTimeline timeline = SegmentTimeline.forSegments((Iterable)newSegments);
        Set visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
        Assert.assertEquals(new HashSet(newSegments), (Object)visibles);
    }

    @Test
    public void testMultipleAppends() {
        Interval interval = null;
        ParallelIndexSupervisorTask task = this.newTask(interval, Granularities.DAY, true, true);
        ParallelIndexSupervisorTask task2 = this.newTask(interval, Granularities.DAY, true, true);
        task.addToContext("forceTimeChunkLock", (Object)true);
        task.addToContext("useSharedLock", (Object)true);
        task2.addToContext("forceTimeChunkLock", (Object)true);
        task2.addToContext("useSharedLock", (Object)true);
        this.getIndexingServiceClient().runTask(task.getId(), task);
        this.getIndexingServiceClient().runTask(task2.getId(), task2);
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().waitToFinish((Task)task, 1L, TimeUnit.DAYS).getStatusCode());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().waitToFinish((Task)task2, 1L, TimeUnit.DAYS).getStatusCode());
    }

    @Test
    public void testRunParallelWithNoInputSplitToProcess() {
        ParallelIndexSupervisorTask task = this.newTask(Intervals.of((String)"2017-12/P1M"), Granularities.DAY, true, true, AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING, "non_existing_file_filter");
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
    }

    @Test
    public void testOverwriteAndAppend() {
        Interval interval = Intervals.of((String)"2017-12/P1M");
        this.testRunAndOverwrite(interval, Granularities.DAY);
        Collection beforeAppendSegments = this.getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
        this.runTestTask(interval, Granularities.DAY, true, beforeAppendSegments);
        Collection afterAppendSegments = this.getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
        Assert.assertTrue((boolean)afterAppendSegments.containsAll(beforeAppendSegments));
        SegmentTimeline timeline = SegmentTimeline.forSegments((Iterable)afterAppendSegments);
        Set visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
        Assert.assertEquals(new HashSet(afterAppendSegments), (Object)visibles);
    }

    @Test
    public void testMaxLocksWith1MaxNumConcurrentSubTasks() {
        Interval interval = Intervals.of((String)"2017-12/P1M");
        boolean appendToExisting = false;
        ParallelIndexSupervisorTask task = this.newTask(interval, Granularities.DAY, false, true, new ParallelIndexTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, Integer.valueOf(1), null, null, null, null, null, null, null, null, null, null, null, Integer.valueOf(0)), VALID_INPUT_SOURCE_FILTER);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        if (this.lockGranularity.equals((Object)LockGranularity.TIME_CHUNK)) {
            this.expectedException.expect(RuntimeException.class);
            this.expectedException.expectMessage("Number of locks exceeded maxAllowedLockCount [0]");
            this.getIndexingServiceClient().runAndWait((Task)task);
        } else {
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
            Assert.assertNull((String)"Runner must be null if the task was in the sequential mode", (Object)task.getCurrentRunner());
            this.assertShardSpec(task, this.lockGranularity, false, Collections.emptyList());
        }
    }

    @Test
    public void testMaxLocksWith2MaxNumConcurrentSubTasks() {
        Interval interval = Intervals.of((String)"2017-12/P1M");
        boolean appendToExisting = false;
        ParallelIndexSupervisorTask task = this.newTask(interval, Granularities.DAY, false, true, new ParallelIndexTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, Integer.valueOf(2), null, null, null, null, null, null, null, null, null, null, null, Integer.valueOf(0)), VALID_INPUT_SOURCE_FILTER);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        if (this.lockGranularity.equals((Object)LockGranularity.TIME_CHUNK)) {
            this.expectedException.expect(RuntimeException.class);
            this.expectedException.expectMessage("Number of locks exceeded maxAllowedLockCount [0]");
            this.getIndexingServiceClient().runAndWait((Task)task);
        } else {
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
            Assert.assertNull((String)"Runner must be null if the task was in the sequential mode", (Object)task.getCurrentRunner());
            this.assertShardSpec(task, this.lockGranularity, false, Collections.emptyList());
        }
    }

    @Test
    public void testIngestBothExplicitAndImplicitDims() throws IOException {
        Interval interval = Intervals.of((String)"2017-12/P1M");
        for (int i = 0; i < 5; ++i) {
            BufferedWriter writer = Files.newBufferedWriter(new File(this.inputDir, "test_" + i + ".json").toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            Object object = null;
            try {
                writer.write(this.getObjectMapper().writeValueAsString((Object)ImmutableMap.of((Object)"ts", (Object)StringUtils.format((String)"2017-12-%d", (Object[])new Object[]{24 + i}), (Object)"implicitDim", (Object)("implicit_" + i), (Object)"explicitDim", (Object)("explicit_" + i))));
                writer.write(this.getObjectMapper().writeValueAsString((Object)ImmutableMap.of((Object)"ts", (Object)StringUtils.format((String)"2017-12-%d", (Object[])new Object[]{25 + i}), (Object)"implicitDim", (Object)("implicit_" + i), (Object)"explicitDim", (Object)("explicit_" + i))));
                continue;
            }
            catch (Throwable throwable) {
                object = throwable;
                throw throwable;
            }
            finally {
                if (writer != null) {
                    if (object != null) {
                        try {
                            ((Writer)writer).close();
                        }
                        catch (Throwable throwable) {
                            ((Throwable)object).addSuppressed(throwable);
                        }
                    } else {
                        ((Writer)writer).close();
                    }
                }
            }
        }
        ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(null, null, null, new ParallelIndexIngestionSpec(new DataSchema("dataSource", DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.builder().setDefaultSchemaDimensions((List)ImmutableList.of((Object)"ts", (Object)"explicitDim")).setIncludeAllDimensions(true).build(), new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, Collections.singletonList(interval)), null), new ParallelIndexIOConfig(null, (InputSource)new SettableSplittableLocalInputSource(this.inputDir, "*.json", true), (InputFormat)new JsonInputFormat(new JSONPathSpec(Boolean.valueOf(true), null), null, null, null, null), Boolean.valueOf(false), null), AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING), null);
        task.addToContext("forceTimeChunkLock", (Object)(this.lockGranularity == LockGranularity.TIME_CHUNK ? 1 : 0));
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)this.getIndexingServiceClient().runAndWait((Task)task).getStatusCode());
        Set<DataSegment> segments = this.getIndexingServiceClient().getPublishedSegments((Task)task);
        for (DataSegment segment : segments) {
            Assert.assertEquals((Object)ImmutableList.of((Object)"ts", (Object)"explicitDim", (Object)"implicitDim"), (Object)segment.getDimensions());
        }
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, boolean appendToExisting, boolean splittableInputSource) {
        return this.newTask(interval, Granularities.DAY, appendToExisting, splittableInputSource);
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, Granularity segmentGranularity, boolean appendToExisting, boolean splittableInputSource) {
        return this.newTask(interval, segmentGranularity, appendToExisting, splittableInputSource, false);
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, Granularity segmentGranularity, boolean appendToExisting, boolean splittableInputSource, boolean isReplace) {
        return this.newTask(interval, segmentGranularity, appendToExisting, splittableInputSource, AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING, VALID_INPUT_SOURCE_FILTER);
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, Granularity segmentGranularity, boolean appendToExisting, boolean splittableInputSource, ParallelIndexTuningConfig tuningConfig, String inputSourceFilter) {
        ParallelIndexIngestionSpec ingestionSpec = this.useInputFormatApi ? new ParallelIndexIngestionSpec(new DataSchema("dataSource", DEFAULT_TIMESTAMP_SPEC, DEFAULT_DIMENSIONS_SPEC, new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, (GranularitySpec)new UniformGranularitySpec(segmentGranularity, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval)), null), new ParallelIndexIOConfig(null, (InputSource)new SettableSplittableLocalInputSource(this.inputDir, inputSourceFilter, splittableInputSource), DEFAULT_INPUT_FORMAT, Boolean.valueOf(appendToExisting), null), tuningConfig) : new ParallelIndexIngestionSpec(new DataSchema("dataSource", (Map)this.getObjectMapper().convertValue((Object)new StringInputRowParser(DEFAULT_PARSE_SPEC, null), Map.class), new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, (GranularitySpec)new UniformGranularitySpec(segmentGranularity, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval)), null, this.getObjectMapper()), new ParallelIndexIOConfig((FirehoseFactory)new LocalFirehoseFactory(this.inputDir, inputSourceFilter, null), Boolean.valueOf(appendToExisting)), tuningConfig);
        return new ParallelIndexSupervisorTask(null, null, null, ingestionSpec, Collections.emptyMap());
    }

    private String getErrorMessageForUnparseableTimestamp() {
        return this.useInputFormatApi ? StringUtils.format((String)"Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable} (Path: %s, Record: 5, Line: 5)", (Object[])new Object[]{new File(this.inputDir, "test_0").toURI()}) : "Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable}";
    }

    private static class SettableSplittableLocalInputSource
    extends LocalInputSource {
        private final boolean splittableInputSource;

        @JsonCreator
        private SettableSplittableLocalInputSource(@JsonProperty(value="baseDir") File baseDir, @JsonProperty(value="filter") String filter, @JsonProperty(value="splittableInputSource") boolean splittableInputSource) {
            super(baseDir, filter);
            this.splittableInputSource = splittableInputSource;
        }

        @JsonProperty
        public boolean isSplittableInputSource() {
            return this.splittableInputSource;
        }

        public boolean isSplittable() {
            return this.splittableInputSource;
        }
    }
}

