/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractMultiPhaseParallelIndexingTest;
import org.apache.druid.indexing.common.task.batch.parallel.DimensionCardinalityReport;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexingPhaseProgress;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskHistory;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class HashPartitionTaskKillTest
extends AbstractMultiPhaseParallelIndexingTest {
    private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
    private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")));
    private static final InputFormat INPUT_FORMAT = new CsvInputFormat(Arrays.asList("ts", "dim1", "dim2", "val"), null, Boolean.valueOf(false), Boolean.valueOf(false), 0);
    private static final Interval INTERVAL_TO_INDEX = Intervals.of((String)"2017-12/P1M");
    private File inputDir;

    public HashPartitionTaskKillTest() {
        super(LockGranularity.TIME_CHUNK, true, 0.0, 0.0);
    }

    @Before
    public void setup() throws IOException {
        Throwable throwable;
        BufferedWriter writer;
        int i;
        this.inputDir = this.temporaryFolder.newFolder("data");
        HashSet<Interval> intervals = new HashSet<Interval>();
        for (i = 0; i < 10; ++i) {
            writer = Files.newBufferedWriter(new File(this.inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            throwable = null;
            try {
                for (int j = 0; j < 10; ++j) {
                    writer.write(StringUtils.format((String)"2017-12-%d,%d,%d th test file\n", (Object[])new Object[]{j + 1, i + 10, i}));
                    writer.write(StringUtils.format((String)"2017-12-%d,%d,%d th test file\n", (Object[])new Object[]{j + 2, i + 11, i}));
                    intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of((String)StringUtils.format((String)"2017-12-%d", (Object[])new Object[]{j + 1}))));
                    intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of((String)StringUtils.format((String)"2017-12-%d", (Object[])new Object[]{j + 2}))));
                }
                continue;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (writer != null) {
                    if (throwable != null) {
                        try {
                            ((Writer)writer).close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        ((Writer)writer).close();
                    }
                }
            }
        }
        for (i = 0; i < 5; ++i) {
            writer = Files.newBufferedWriter(new File(this.inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            throwable = null;
            try {
                writer.write(StringUtils.format((String)"2017-12-%d,%d,%d th test file\n", (Object[])new Object[]{i + 1, i + 10, i}));
                continue;
            }
            catch (Throwable throwable4) {
                throwable = throwable4;
                throw throwable4;
            }
            finally {
                if (writer != null) {
                    if (throwable != null) {
                        try {
                            ((Writer)writer).close();
                        }
                        catch (Throwable throwable5) {
                            throwable.addSuppressed(throwable5);
                        }
                    } else {
                        ((Writer)writer).close();
                    }
                }
            }
        }
        ArrayList inputIntervals = new ArrayList(intervals);
        inputIntervals.sort(Comparators.intervalsByStartThenEnd());
    }

    @Test(timeout=5000L)
    public void failsInFirstPhase() throws Exception {
        ParallelIndexSupervisorTask task = this.createTestTask(TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, null, INTERVAL_TO_INDEX, this.inputDir, "test_*", (PartitionsSpec)new HashedPartitionsSpec(null, null, (List)ImmutableList.of((Object)"dim1", (Object)"dim2")), 2, false, true, 0);
        IngestionTestBase.TestLocalTaskActionClient actionClient = this.createActionClient((Task)task);
        TaskToolbox toolbox = this.createTaskToolbox((Task)task, actionClient);
        this.prepareTaskForLocking((Task)task);
        Assert.assertTrue((boolean)task.isReady((TaskActionClient)actionClient));
        task.stopGracefully(null);
        TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);
        Assert.assertTrue((boolean)taskStatus.isFailure());
        Assert.assertEquals((Object)"Failed in phase[PHASE-1]. See task logs for details.", (Object)taskStatus.getErrorMsg());
    }

    @Test(timeout=5000L)
    public void failsInSecondPhase() throws Exception {
        ParallelIndexSupervisorTask task = this.createTestTask(TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, null, INTERVAL_TO_INDEX, this.inputDir, "test_*", (PartitionsSpec)new HashedPartitionsSpec(null, Integer.valueOf(3), (List)ImmutableList.of((Object)"dim1", (Object)"dim2")), 2, false, true, 0);
        IngestionTestBase.TestLocalTaskActionClient actionClient = this.createActionClient((Task)task);
        TaskToolbox toolbox = this.createTaskToolbox((Task)task, actionClient);
        this.prepareTaskForLocking((Task)task);
        Assert.assertTrue((boolean)task.isReady((TaskActionClient)actionClient));
        task.stopGracefully(null);
        TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);
        Assert.assertTrue((boolean)taskStatus.isFailure());
        Assert.assertEquals((Object)"Failed in phase[PHASE-2]. See task logs for details.", (Object)taskStatus.getErrorMsg());
    }

    @Test(timeout=5000L)
    public void failsInThirdPhase() throws Exception {
        ParallelIndexSupervisorTask task = this.createTestTask(TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, null, INTERVAL_TO_INDEX, this.inputDir, "test_*", (PartitionsSpec)new HashedPartitionsSpec(null, Integer.valueOf(3), (List)ImmutableList.of((Object)"dim1", (Object)"dim2")), 2, false, true, 1);
        IngestionTestBase.TestLocalTaskActionClient actionClient = this.createActionClient((Task)task);
        TaskToolbox toolbox = this.createTaskToolbox((Task)task, actionClient);
        this.prepareTaskForLocking((Task)task);
        Assert.assertTrue((boolean)task.isReady((TaskActionClient)actionClient));
        task.stopGracefully(null);
        TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);
        Assert.assertTrue((boolean)taskStatus.isFailure());
        Assert.assertEquals((Object)"Failed in phase[PHASE-3]. See task logs for details.", (Object)taskStatus.getErrorMsg());
    }

    private ParallelIndexSupervisorTask createTestTask(@Nullable TimestampSpec timestampSpec, @Nullable DimensionsSpec dimensionsSpec, @Nullable InputFormat inputFormat, @Nullable ParseSpec parseSpec, Interval interval, File inputDir, String filter, PartitionsSpec partitionsSpec, int maxNumConcurrentSubTasks, boolean appendToExisting, boolean useInputFormatApi, int succeedsBeforeFailing) {
        ParallelIndexIngestionSpec ingestionSpec;
        UniformGranularitySpec granularitySpec = new UniformGranularitySpec(SEGMENT_GRANULARITY, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval));
        ParallelIndexTuningConfig tuningConfig = this.newTuningConfig(partitionsSpec, maxNumConcurrentSubTasks, !appendToExisting);
        if (useInputFormatApi) {
            Preconditions.checkArgument((parseSpec == null ? 1 : 0) != 0);
            ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(null, (InputSource)new LocalInputSource(inputDir, filter), inputFormat, Boolean.valueOf(appendToExisting), null);
            ingestionSpec = new ParallelIndexIngestionSpec(new DataSchema("dataSource", timestampSpec, dimensionsSpec, new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, (GranularitySpec)granularitySpec, null), ioConfig, tuningConfig);
        } else {
            Preconditions.checkArgument((inputFormat == null ? 1 : 0) != 0);
            ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig((FirehoseFactory)new LocalFirehoseFactory(inputDir, filter, null), Boolean.valueOf(appendToExisting));
            ingestionSpec = new ParallelIndexIngestionSpec(new DataSchema("dataSource", (Map)this.getObjectMapper().convertValue((Object)new StringInputRowParser(parseSpec, null), Map.class), new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, (GranularitySpec)granularitySpec, null, this.getObjectMapper()), ioConfig, tuningConfig);
        }
        return new ParallelIndexSupervisorTaskTest(null, null, null, ingestionSpec, null, Collections.emptyMap(), succeedsBeforeFailing);
    }

    static class TestRunner
    implements ParallelIndexTaskRunner<PartialDimensionCardinalityTask, DimensionCardinalityReport> {
        private final boolean succeeds;
        private final String phase;

        TestRunner(boolean succeeds, String phase) {
            this.succeeds = succeeds;
            this.phase = phase;
        }

        public String getName() {
            if (this.succeeds) {
                return StringUtils.format((String)this.phase, (Object[])new Object[0]);
            }
            return StringUtils.format((String)this.phase, (Object[])new Object[0]);
        }

        public TaskState run() {
            if (this.succeeds) {
                return TaskState.SUCCESS;
            }
            return TaskState.FAILED;
        }

        public void stopGracefully(String stopReason) {
        }

        public String getStopReason() {
            return null;
        }

        public void collectReport(DimensionCardinalityReport report) {
        }

        public Map<String, DimensionCardinalityReport> getReports() {
            return Collections.emptyMap();
        }

        public ParallelIndexingPhaseProgress getProgress() {
            return null;
        }

        public Set<String> getRunningTaskIds() {
            return null;
        }

        public List<SubTaskSpec<PartialDimensionCardinalityTask>> getSubTaskSpecs() {
            return null;
        }

        public List<SubTaskSpec<PartialDimensionCardinalityTask>> getRunningSubTaskSpecs() {
            return null;
        }

        public List<SubTaskSpec<PartialDimensionCardinalityTask>> getCompleteSubTaskSpecs() {
            return null;
        }

        @Nullable
        public SubTaskSpec<PartialDimensionCardinalityTask> getSubTaskSpec(String subTaskSpecId) {
            return null;
        }

        @Nullable
        public ParallelIndexTaskRunner.SubTaskSpecStatus getSubTaskState(String subTaskSpecId) {
            return null;
        }

        @Nullable
        public TaskHistory<PartialDimensionCardinalityTask> getCompleteSubTaskSpecAttemptHistory(String subTaskSpecId) {
            return null;
        }
    }

    static class ParallelIndexSupervisorTaskTest
    extends ParallelIndexSupervisorTask {
        private final int succeedsBeforeFailing;
        private int numRuns = 0;

        public ParallelIndexSupervisorTaskTest(String id, @Nullable String groupId, TaskResource taskResource, ParallelIndexIngestionSpec ingestionSchema, @Nullable String baseSubtaskSpecName, Map<String, Object> context, int succedsBeforeFailing) {
            super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName, context);
            this.succeedsBeforeFailing = succedsBeforeFailing;
        }

        <T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(TaskToolbox toolbox, Function<TaskToolbox, ParallelIndexTaskRunner<T, R>> runnerCreator) {
            TestRunner retVal;
            if (this.succeedsBeforeFailing == 0 && this.getIngestionSchema().getTuningConfig().getNumShards() == null) {
                retVal = new TestRunner(false, "PHASE-1");
            } else if (this.succeedsBeforeFailing == 0 && this.getIngestionSchema().getTuningConfig().getNumShards() != null) {
                retVal = new TestRunner(false, "PHASE-2");
            } else if (this.succeedsBeforeFailing == 1 && this.numRuns == 1 && this.getIngestionSchema().getTuningConfig().getNumShards() != null) {
                retVal = new TestRunner(false, "PHASE-3");
            } else {
                ++this.numRuns;
                retVal = new TestRunner(true, "SUCCESFUL-PHASE");
            }
            return retVal;
        }
    }
}

