/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractMultiPhaseParallelIndexingTest;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPhaseParallelIndexingTest {
    private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
    private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")));
    private static final ParseSpec PARSE_SPEC = new CSVParseSpec(TIMESTAMP_SPEC, DIMENSIONS_SPEC, null, Arrays.asList("ts", "dim1", "dim2", "val"), false, 0);
    private static final InputFormat INPUT_FORMAT = new CsvInputFormat(Arrays.asList("ts", "dim1", "dim2", "val"), null, Boolean.valueOf(false), Boolean.valueOf(false), 0);
    private static final Interval INTERVAL_TO_INDEX = Intervals.of((String)"2017-12/P1M");
    private static final String INPUT_FILTER = "test_*";
    private final int maxNumConcurrentSubTasks;
    @Nullable
    private final Interval intervalToIndex;
    @Nullable
    private final Integer numShards;
    private File inputDir;
    private List<Interval> inputIntervals;

    @Parameterized.Parameters(name="lockGranularity={0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, intervalToIndex={3}, numShards={4}")
    public static Iterable<Object[]> constructorFeeder() {
        return ImmutableList.of((Object)new Object[]{LockGranularity.TIME_CHUNK, false, 2, INTERVAL_TO_INDEX, 2}, (Object)new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX, 2}, (Object)new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, 2}, (Object)new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX, 2}, (Object)new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, 2}, (Object)new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX, null}, (Object)new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, null}, (Object)new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX, null}, (Object)new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, null});
    }

    public HashPartitionMultiPhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi, int maxNumConcurrentSubTasks, @Nullable Interval intervalToIndex, @Nullable Integer numShards) {
        super(lockGranularity, useInputFormatApi, 0.2, 0.2);
        this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
        this.intervalToIndex = intervalToIndex;
        this.numShards = numShards;
    }

    @Before
    public void setup() throws IOException {
        Throwable throwable;
        BufferedWriter writer;
        int i;
        this.inputDir = this.temporaryFolder.newFolder("data");
        HashSet<Interval> intervals = new HashSet<Interval>();
        for (i = 0; i < 10; ++i) {
            writer = Files.newBufferedWriter(new File(this.inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            throwable = null;
            try {
                for (int j = 0; j < 10; ++j) {
                    writer.write(StringUtils.format((String)"2017-12-%d,%d,%d th test file\n", (Object[])new Object[]{j + 1, i + 10, i}));
                    writer.write(StringUtils.format((String)"2017-12-%d,%d,%d th test file\n", (Object[])new Object[]{j + 2, i + 11, i}));
                    intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of((String)StringUtils.format((String)"2017-12-%d", (Object[])new Object[]{j + 1}))));
                    intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of((String)StringUtils.format((String)"2017-12-%d", (Object[])new Object[]{j + 2}))));
                }
                continue;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (writer != null) {
                    if (throwable != null) {
                        try {
                            ((Writer)writer).close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        ((Writer)writer).close();
                    }
                }
            }
        }
        for (i = 0; i < 5; ++i) {
            writer = Files.newBufferedWriter(new File(this.inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            throwable = null;
            try {
                writer.write(StringUtils.format((String)"2017-12-%d,%d,%d th test file\n", (Object[])new Object[]{i + 1, i + 10, i}));
                continue;
            }
            catch (Throwable throwable4) {
                throwable = throwable4;
                throw throwable4;
            }
            finally {
                if (writer != null) {
                    if (throwable != null) {
                        try {
                            ((Writer)writer).close();
                        }
                        catch (Throwable throwable5) {
                            throwable.addSuppressed(throwable5);
                        }
                    } else {
                        ((Writer)writer).close();
                    }
                }
            }
        }
        this.inputIntervals = new ArrayList<Interval>(intervals);
        this.inputIntervals.sort(Comparators.intervalsByStartThenEnd());
    }

    @Test
    public void testRun() throws Exception {
        Integer maxRowsPerSegment = this.numShards == null ? Integer.valueOf(10) : null;
        Set<DataSegment> publishedSegments = this.runTask((Task)this.createTask((PartitionsSpec)new HashedPartitionsSpec(maxRowsPerSegment, this.numShards, (List)ImmutableList.of((Object)"dim1", (Object)"dim2")), this.inputDir, false, false), TaskState.SUCCESS);
        Map<Interval, Integer> expectedIntervalToNumSegments = this.computeExpectedIntervalToNumSegments(maxRowsPerSegment, this.numShards);
        this.assertHashedPartition(publishedSegments, expectedIntervalToNumSegments);
        if (this.intervalToIndex == null) {
            return;
        }
        Set<DataSegment> publishedSegmentsAfterReplace = this.runTask((Task)this.createTask((PartitionsSpec)new HashedPartitionsSpec(maxRowsPerSegment, this.numShards, (List)ImmutableList.of((Object)"dim1", (Object)"dim2")), this.newInputDirForReplace(), false, true), TaskState.SUCCESS);
        Map<Interval, Integer> expectedIntervalToNumSegmentsAfterReplace = this.computeExpectedIntervalToNumSegments(maxRowsPerSegment, this.numShards);
        int tombstones = 0;
        for (DataSegment ds : publishedSegmentsAfterReplace) {
            if (ds.isTombstone()) {
                expectedIntervalToNumSegmentsAfterReplace.put(ds.getInterval(), 1);
                ++tombstones;
                continue;
            }
            if (this.numShards != null) continue;
            expectedIntervalToNumSegmentsAfterReplace.put(ds.getInterval(), 1);
        }
        Assert.assertEquals((long)5L, (long)tombstones);
        int expectedSegments = 12;
        if (this.numShards == null) {
            expectedSegments = 6;
        }
        Assert.assertEquals((long)expectedSegments, (long)(publishedSegmentsAfterReplace.size() - tombstones));
        this.assertHashedPartition(publishedSegmentsAfterReplace, expectedIntervalToNumSegmentsAfterReplace);
    }

    @Test
    public void testRunWithHashPartitionFunction() throws Exception {
        Integer maxRowsPerSegment = this.numShards == null ? Integer.valueOf(10) : null;
        Set<DataSegment> publishedSegments = this.runTask((Task)this.createTask((PartitionsSpec)new HashedPartitionsSpec(maxRowsPerSegment, this.numShards, (List)ImmutableList.of((Object)"dim1", (Object)"dim2"), HashPartitionFunction.MURMUR3_32_ABS), this.inputDir, false, false), TaskState.SUCCESS);
        Map<Interval, Integer> expectedIntervalToNumSegments = this.computeExpectedIntervalToNumSegments(maxRowsPerSegment, this.numShards);
        this.assertHashedPartition(publishedSegments, expectedIntervalToNumSegments);
    }

    private Map<Interval, Integer> computeExpectedIntervalToNumSegments(@Nullable Integer maxRowsPerSegment, @Nullable Integer numShards) {
        HashMap<Interval, Integer> expectedIntervalToNumSegments = new HashMap<Interval, Integer>();
        for (int i = 0; i < this.inputIntervals.size(); ++i) {
            if (numShards == null) {
                if (i == 0 || i == this.inputIntervals.size() - 1) {
                    expectedIntervalToNumSegments.put(this.inputIntervals.get(i), Math.round(10.0f / (float)maxRowsPerSegment.intValue()));
                    continue;
                }
                expectedIntervalToNumSegments.put(this.inputIntervals.get(i), Math.round(20.0f / (float)maxRowsPerSegment.intValue()));
                continue;
            }
            expectedIntervalToNumSegments.put(this.inputIntervals.get(i), numShards);
        }
        return expectedIntervalToNumSegments;
    }

    @Test
    public void testAppendLinearlyPartitionedSegmensToHashPartitionedDatasourceSuccessfullyAppend() {
        HashSet<DataSegment> publishedSegments = new HashSet<DataSegment>();
        publishedSegments.addAll(this.runTask((Task)this.createTask((PartitionsSpec)new HashedPartitionsSpec(null, this.numShards, (List)ImmutableList.of((Object)"dim1", (Object)"dim2")), this.inputDir, false, false), TaskState.SUCCESS));
        publishedSegments.addAll(this.runTask((Task)this.createTask((PartitionsSpec)new DynamicPartitionsSpec(Integer.valueOf(5), null), this.inputDir, true, false), TaskState.SUCCESS));
        publishedSegments.addAll(this.runTask((Task)this.createTask((PartitionsSpec)new DynamicPartitionsSpec(Integer.valueOf(10), null), this.inputDir, true, false), TaskState.SUCCESS));
        HashMap intervalToSegments = new HashMap();
        publishedSegments.forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList()).add(segment));
        for (Map.Entry entry : intervalToSegments.entrySet()) {
            List segments = (List)entry.getValue();
            List hashedSegments = segments.stream().filter(segment -> segment.getShardSpec().getClass() == HashBasedNumberedShardSpec.class).collect(Collectors.toList());
            List linearSegments = segments.stream().filter(segment -> segment.getShardSpec().getClass() == NumberedShardSpec.class).collect(Collectors.toList());
            for (DataSegment hashedSegment : hashedSegments) {
                HashBasedNumberedShardSpec hashShardSpec = (HashBasedNumberedShardSpec)hashedSegment.getShardSpec();
                for (DataSegment linearSegment : linearSegments) {
                    Assert.assertEquals((Object)hashedSegment.getInterval(), (Object)linearSegment.getInterval());
                    Assert.assertEquals((Object)hashedSegment.getVersion(), (Object)linearSegment.getVersion());
                    NumberedShardSpec numberedShardSpec = (NumberedShardSpec)linearSegment.getShardSpec();
                    Assert.assertEquals((long)hashShardSpec.getNumCorePartitions(), (long)numberedShardSpec.getNumCorePartitions());
                    Assert.assertTrue((hashShardSpec.getPartitionNum() < numberedShardSpec.getPartitionNum() ? 1 : 0) != 0);
                }
            }
        }
    }

    private ParallelIndexSupervisorTask createTask(PartitionsSpec partitionsSpec, File inputDirectory, boolean appendToExisting, boolean dropExisting) {
        if (this.isUseInputFormatApi()) {
            return this.createTask(TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, null, this.intervalToIndex, inputDirectory, INPUT_FILTER, partitionsSpec, this.maxNumConcurrentSubTasks, appendToExisting, dropExisting);
        }
        return this.createTask(null, null, null, PARSE_SPEC, this.intervalToIndex, inputDirectory, INPUT_FILTER, partitionsSpec, this.maxNumConcurrentSubTasks, appendToExisting, dropExisting);
    }

    private void assertHashedPartition(Set<DataSegment> publishedSegments, Map<Interval, Integer> expectedIntervalToNumSegments) throws IOException {
        HashMap intervalToSegments = new HashMap();
        publishedSegments.forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList()).add(segment));
        Assert.assertEquals(new HashSet<Interval>(this.inputIntervals), intervalToSegments.keySet());
        File tempSegmentDir = this.temporaryFolder.newFolder();
        for (Map.Entry entry : intervalToSegments.entrySet()) {
            Interval interval = (Interval)entry.getKey();
            List segmentsInInterval = (List)entry.getValue();
            Assert.assertEquals((long)expectedIntervalToNumSegments.get(interval).intValue(), (long)segmentsInInterval.size());
            for (DataSegment segment2 : segmentsInInterval) {
                HashBasedNumberedShardSpec shardSpec = null;
                if (segment2.isTombstone()) {
                    Assert.assertSame(TombstoneShardSpec.class, segment2.getShardSpec().getClass());
                } else {
                    Assert.assertSame(HashBasedNumberedShardSpec.class, segment2.getShardSpec().getClass());
                    shardSpec = (HashBasedNumberedShardSpec)segment2.getShardSpec();
                    Assert.assertEquals((Object)HashPartitionFunction.MURMUR3_32_ABS, (Object)shardSpec.getPartitionFunction());
                }
                List<ScanResultValue> results = this.querySegment(segment2, (List<String>)ImmutableList.of((Object)"dim1", (Object)"dim2"), tempSegmentDir);
                if (segment2.isTombstone()) {
                    Assert.assertTrue((boolean)results.isEmpty());
                    continue;
                }
                int hash = shardSpec.getPartitionFunction().hash(HashBasedNumberedShardSpec.serializeGroupKey((ObjectMapper)this.getObjectMapper(), (List)((List)results.get(0).getEvents())), shardSpec.getNumBuckets());
                for (ScanResultValue value : results) {
                    Assert.assertEquals((long)hash, (long)shardSpec.getPartitionFunction().hash(HashBasedNumberedShardSpec.serializeGroupKey((ObjectMapper)this.getObjectMapper(), (List)((List)value.getEvents())), shardSpec.getNumBuckets()));
                }
            }
        }
    }

    private File newInputDirForReplace() throws IOException {
        File inputDirectory = this.temporaryFolder.newFolder("dataReplace");
        HashSet<Integer> fileIds = new HashSet<Integer>();
        fileIds.add(3);
        fileIds.add(7);
        fileIds.add(9);
        for (Integer i : fileIds) {
            BufferedWriter writer = Files.newBufferedWriter(new File(inputDirectory, "test_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            Throwable throwable = null;
            try {
                for (int j = 0; j < 10; ++j) {
                    writer.write(StringUtils.format((String)"2017-12-%d,%d,%d th test file\n", (Object[])new Object[]{i + 1, j + 10, i}));
                    writer.write(StringUtils.format((String)"2017-12-%d,%d,%d th test file\n", (Object[])new Object[]{i + 2, j + 11, i}));
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (writer == null) continue;
                if (throwable != null) {
                    try {
                        ((Writer)writer).close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                ((Writer)writer).close();
            }
        }
        return inputDirectory;
    }
}

