/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.firehose;

import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Module;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexing.common.ReingestionTimelineUtils;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class IngestSegmentFirehoseFactoryTest {
    private static final ObjectMapper MAPPER;
    private static final IndexMergerV9 INDEX_MERGER_V9;
    private static final IndexIO INDEX_IO;
    private static final TaskStorage TASK_STORAGE;
    private static final IndexerSQLMetadataStorageCoordinator MDC;
    private static final TaskLockbox TASK_LOCKBOX;
    private static final Task TASK;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Logger log;
    private static final String DATA_SOURCE_NAME = "testDataSource";
    private static final String DATA_SOURCE_VERSION = "version";
    private static final Integer BINARY_VERSION;
    private static final String DIM_NAME = "testDimName";
    private static final String DIM_VALUE = "testDimValue";
    private static final String DIM_LONG_NAME = "testDimLongName";
    private static final String DIM_FLOAT_NAME = "testDimFloatName";
    private static final String METRIC_LONG_NAME = "testLongMetric";
    private static final String METRIC_FLOAT_NAME = "testFloatMetric";
    private static final Long METRIC_LONG_VALUE;
    private static final Float METRIC_FLOAT_VALUE;
    private static final String TIME_COLUMN = "ts";
    private static final Integer MAX_SHARD_NUMBER;
    private static final Integer MAX_ROWS;
    private static final File TMP_DIR;
    private static final File PERSIST_DIR;
    private static final List<DataSegment> SEGMENT_SET;
    private final FirehoseFactory<InputRowParser> factory;
    private final InputRowParser rowParser;
    private File tempDir;
    private static final InputRowParser<Map<String, Object>> ROW_PARSER;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> constructorFeeder() throws IOException {
        IndexSpec indexSpec = new IndexSpec();
        IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder().withMinTimestamp(-4611686018427387904L).withDimensionsSpec(ROW_PARSER).withMetrics(new AggregatorFactory[]{new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME), new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME)}).build();
        IncrementalIndex index = new OnheapIncrementalIndex.Builder().setIndexSchema(schema).setMaxRowCount(MAX_ROWS * MAX_SHARD_NUMBER).build();
        Integer i = 0;
        while (i < MAX_ROWS) {
            index.add((InputRow)ROW_PARSER.parseBatch(IngestSegmentFirehoseFactoryTest.buildRow(i.longValue())).get(0));
            i = i + 1;
        }
        FileUtils.mkdirp((File)PERSIST_DIR);
        INDEX_MERGER_V9.persist(index, PERSIST_DIR, indexSpec, null);
        CoordinatorClient cc = new CoordinatorClient(null, null){

            public Collection<DataSegment> fetchUsedSegmentsInDataSourceForIntervals(String dataSource, List<Interval> intervals) {
                return ImmutableSet.copyOf((Collection)SEGMENT_SET);
            }
        };
        SegmentHandoffNotifierFactory notifierFactory = (SegmentHandoffNotifierFactory)EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
        EasyMock.replay((Object[])new Object[]{notifierFactory});
        SegmentCacheManagerFactory slf = new SegmentCacheManagerFactory(MAPPER);
        RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
        ArrayList<Object[]> values = new ArrayList<Object[]>();
        for (InputRowParser parser : Arrays.asList(ROW_PARSER, new MapInputRowParser((ParseSpec)new JSONParseSpec(new TimestampSpec(TIME_COLUMN, "auto", null), DimensionsSpec.builder().setDimensionExclusions((List)ImmutableList.of((Object)DIM_FLOAT_NAME, (Object)DIM_LONG_NAME)).build(), null, null, null)))) {
            for (List dim_names : Arrays.asList(null, ImmutableList.of((Object)DIM_NAME))) {
                for (List metric_names : Arrays.asList(null, ImmutableList.of((Object)METRIC_LONG_NAME, (Object)METRIC_FLOAT_NAME))) {
                    for (Boolean wrapInCombining : Arrays.asList(false, true)) {
                        IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory(TASK.getDataSource(), Intervals.ETERNITY, null, (DimFilter)new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), dim_names, metric_names, null, INDEX_IO, cc, slf, retryPolicyFactory);
                        IngestSegmentFirehoseFactory factory = wrapInCombining != false ? new CombiningFirehoseFactory((List)ImmutableList.of((Object)isfFactory)) : isfFactory;
                        values.add(new Object[]{StringUtils.format((String)"DimNames[%s]MetricNames[%s]ParserDimNames[%s]WrapInCombining[%s]", (Object[])new Object[]{dim_names == null ? "null" : "dims", metric_names == null ? "null" : "metrics", parser == ROW_PARSER ? "dims" : "null", wrapInCombining}), factory, parser});
                    }
                }
            }
        }
        return values;
    }

    public static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) {
        objectMapper.registerModule((com.fasterxml.jackson.databind.Module)new SimpleModule("testModule").registerSubtypes(new Class[]{LocalLoadSpec.class}));
        GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
        objectMapper.setAnnotationIntrospectors((AnnotationIntrospector)new AnnotationIntrospectorPair((AnnotationIntrospector)guiceIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector()), (AnnotationIntrospector)new AnnotationIntrospectorPair((AnnotationIntrospector)guiceIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector()));
        objectMapper.setInjectableValues((InjectableValues)new GuiceInjectableValues(GuiceInjectors.makeStartupInjectorWithModules((Iterable)ImmutableList.of((Object)new Module(){

            public void configure(Binder binder) {
                binder.bind(LocalDataSegmentPuller.class);
                binder.bind(ExprMacroTable.class).toInstance((Object)TestExprMacroTable.INSTANCE);
            }
        }))));
        return objectMapper;
    }

    public IngestSegmentFirehoseFactoryTest(String testName, FirehoseFactory factory, InputRowParser rowParser) {
        this.factory = factory;
        this.rowParser = TransformSpec.NONE.decorate(rowParser);
    }

    private static Map<String, Object> buildRow(Long ts) {
        return ImmutableMap.of((Object)TIME_COLUMN, (Object)ts, (Object)DIM_NAME, (Object)DIM_VALUE, (Object)DIM_FLOAT_NAME, (Object)METRIC_FLOAT_VALUE, (Object)DIM_LONG_NAME, (Object)METRIC_LONG_VALUE);
    }

    private static DataSegment buildSegment(Integer shardNumber) {
        Preconditions.checkArgument((shardNumber < MAX_SHARD_NUMBER ? 1 : 0) != 0);
        Preconditions.checkArgument((shardNumber >= 0 ? 1 : 0) != 0);
        return new DataSegment(DATA_SOURCE_NAME, Intervals.ETERNITY, DATA_SOURCE_VERSION, (Map)ImmutableMap.of((Object)"type", (Object)"local", (Object)"path", (Object)PERSIST_DIR.getAbsolutePath()), (List)ImmutableList.of((Object)DIM_NAME), (List)ImmutableList.of((Object)METRIC_LONG_NAME, (Object)METRIC_FLOAT_NAME), (ShardSpec)new NumberedShardSpec(shardNumber.intValue(), MAX_SHARD_NUMBER.intValue()), BINARY_VERSION, 0L);
    }

    @BeforeClass
    public static void setUpStatic() {
        for (int i = 0; i < MAX_SHARD_NUMBER; ++i) {
            SEGMENT_SET.add(IngestSegmentFirehoseFactoryTest.buildSegment(i));
        }
    }

    @AfterClass
    public static void tearDownStatic() {
        IngestSegmentFirehoseFactoryTest.recursivelyDelete(TMP_DIR);
    }

    private static void recursivelyDelete(File dir) {
        if (dir != null) {
            if (dir.isDirectory()) {
                File[] files = dir.listFiles();
                if (files != null) {
                    for (File file : files) {
                        IngestSegmentFirehoseFactoryTest.recursivelyDelete(file);
                    }
                }
            } else if (!dir.delete()) {
                log.warn("Could not delete file at [%s]", new Object[]{dir.getAbsolutePath()});
            }
        }
    }

    @Before
    public void setup() throws IOException {
        this.tempDir = this.temporaryFolder.newFolder();
    }

    @After
    public void teardown() {
        this.tempDir.delete();
    }

    @Test
    public void sanityTest() {
        if (this.factory instanceof CombiningFirehoseFactory) {
            return;
        }
        IngestSegmentFirehoseFactory isfFactory = (IngestSegmentFirehoseFactory)this.factory;
        Assert.assertEquals((Object)TASK.getDataSource(), (Object)isfFactory.getDataSource());
        if (isfFactory.getDimensions() != null) {
            Assert.assertArrayEquals((Object[])new String[]{DIM_NAME}, (Object[])isfFactory.getDimensions().toArray());
        }
        Assert.assertEquals((Object)Intervals.ETERNITY, (Object)isfFactory.getInterval());
        if (isfFactory.getMetrics() != null) {
            Assert.assertEquals((Object)ImmutableSet.of((Object)METRIC_LONG_NAME, (Object)METRIC_FLOAT_NAME), (Object)ImmutableSet.copyOf((Collection)isfFactory.getMetrics()));
        }
    }

    @Test
    public void simpleFirehoseReadingTest() throws IOException {
        Assert.assertEquals((long)MAX_SHARD_NUMBER.longValue(), (long)SEGMENT_SET.size());
        Integer rowcount = 0;
        try (Firehose firehose = this.factory.connect(this.rowParser, TMP_DIR);){
            while (firehose.hasMore()) {
                InputRow row = firehose.nextRow();
                Assert.assertArrayEquals((Object[])new String[]{DIM_NAME}, (Object[])row.getDimensions().toArray());
                Assert.assertArrayEquals((Object[])new String[]{DIM_VALUE}, (Object[])row.getDimension(DIM_NAME).toArray());
                Assert.assertEquals((Object)METRIC_LONG_VALUE, (Object)row.getMetric(METRIC_LONG_NAME));
                Assert.assertEquals((double)METRIC_FLOAT_VALUE.floatValue(), (double)row.getMetric(METRIC_FLOAT_NAME).floatValue(), (double)((double)METRIC_FLOAT_VALUE.floatValue() * 1.0E-4));
                rowcount = rowcount + 1;
            }
        }
        Assert.assertEquals((long)(MAX_SHARD_NUMBER * MAX_ROWS), (long)rowcount.intValue());
    }

    @Test
    public void testTransformSpec() throws IOException {
        Assert.assertEquals((long)MAX_SHARD_NUMBER.longValue(), (long)SEGMENT_SET.size());
        Integer rowcount = 0;
        TransformSpec transformSpec = new TransformSpec((DimFilter)new SelectorDimFilter("__time", "1", null), (List)ImmutableList.of((Object)new ExpressionTransform(METRIC_FLOAT_NAME, "testFloatMetric * 10", ExprMacroTable.nil())));
        int skipped = 0;
        try (Firehose firehose = this.factory.connect(transformSpec.decorate(this.rowParser), TMP_DIR);){
            while (firehose.hasMore()) {
                InputRow row = firehose.nextRow();
                if (row == null) {
                    ++skipped;
                    continue;
                }
                Assert.assertArrayEquals((Object[])new String[]{DIM_NAME}, (Object[])row.getDimensions().toArray());
                Assert.assertArrayEquals((Object[])new String[]{DIM_VALUE}, (Object[])row.getDimension(DIM_NAME).toArray());
                Assert.assertEquals((long)METRIC_LONG_VALUE, (long)row.getMetric(METRIC_LONG_NAME).longValue());
                Assert.assertEquals((double)(METRIC_FLOAT_VALUE.floatValue() * 10.0f), (double)row.getMetric(METRIC_FLOAT_NAME).floatValue(), (double)((double)METRIC_FLOAT_VALUE.floatValue() * 1.0E-4));
                rowcount = rowcount + 1;
            }
        }
        Assert.assertEquals((long)90L, (long)skipped);
        Assert.assertEquals((long)MAX_ROWS.intValue(), (long)rowcount.intValue());
    }

    @Test
    public void testGetUniqueDimensionsAndMetrics() {
        int numSegmentsPerPartitionChunk = 5;
        int numPartitionChunksPerTimelineObject = 10;
        int numSegments = 50;
        Interval interval = Intervals.of((String)"2017-01-01/2017-01-02");
        String version = "1";
        ArrayList<TimelineObjectHolder> timelineSegments = new ArrayList<TimelineObjectHolder>();
        for (int i = 0; i < 10; ++i) {
            ArrayList<NumberedPartitionChunk> chunks = new ArrayList<NumberedPartitionChunk>();
            for (int j = 0; j < 5; ++j) {
                List dims = IntStream.range(i, i + 5).mapToObj(suffix -> "dim" + suffix).collect(Collectors.toList());
                List metrics = IntStream.range(i, i + 5).mapToObj(suffix -> "met" + suffix).collect(Collectors.toList());
                DataSegment segment = new DataSegment("ds", interval, "1", (Map)ImmutableMap.of(), dims, metrics, (ShardSpec)new NumberedShardSpec(10, i), Integer.valueOf(1), 1L);
                NumberedPartitionChunk partitionChunk = new NumberedPartitionChunk(i, 10, (Object)segment);
                chunks.add(partitionChunk);
            }
            TimelineObjectHolder timelineHolder = new TimelineObjectHolder(interval, (Object)"1", new PartitionHolder(chunks));
            timelineSegments.add(timelineHolder);
        }
        String[] expectedDims = new String[]{"dim9", "dim10", "dim11", "dim12", "dim13", "dim8", "dim7", "dim6", "dim5", "dim4", "dim3", "dim2", "dim1", "dim0"};
        String[] expectedMetrics = new String[]{"met9", "met10", "met11", "met12", "met13", "met8", "met7", "met6", "met5", "met4", "met3", "met2", "met1", "met0"};
        Assert.assertEquals(Arrays.asList(expectedDims), (Object)ReingestionTimelineUtils.getUniqueDimensions(timelineSegments, null));
        Assert.assertEquals(Arrays.asList(expectedMetrics), (Object)ReingestionTimelineUtils.getUniqueMetrics(timelineSegments));
    }

    private static ServiceEmitter newMockEmitter() {
        return new NoopServiceEmitter();
    }

    static {
        TestUtils testUtils = new TestUtils();
        MAPPER = IngestSegmentFirehoseFactoryTest.setupInjectablesInObjectMapper(TestHelper.makeJsonMapper());
        INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
        INDEX_IO = testUtils.getTestIndexIO();
        TASK_STORAGE = new HeapMemoryTaskStorage(new TaskStorageConfig(null){});
        MDC = new IndexerSQLMetadataStorageCoordinator(null, null, null){
            private final Set<DataSegment> published = new HashSet<DataSegment>();

            public List<DataSegment> retrieveUsedSegmentsForIntervals(String dataSource, List<Interval> interval, Segments visibility) {
                return ImmutableList.copyOf((Collection)SEGMENT_SET);
            }

            public List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, Interval interval) {
                return ImmutableList.of();
            }

            public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) {
                HashSet<DataSegment> added = new HashSet<DataSegment>();
                for (DataSegment segment : segments) {
                    if (!this.published.add(segment)) continue;
                    added.add(segment);
                }
                return ImmutableSet.copyOf(added);
            }

            public void deleteSegments(Set<DataSegment> segments) {
            }
        };
        TASK_LOCKBOX = new TaskLockbox(TASK_STORAGE, (IndexerMetadataStorageCoordinator)MDC);
        TASK = NoopTask.create();
        TASK_LOCKBOX.add(TASK);
        log = new Logger(IngestSegmentFirehoseFactoryTest.class);
        BINARY_VERSION = -1;
        METRIC_LONG_VALUE = 1L;
        METRIC_FLOAT_VALUE = Float.valueOf(1.0f);
        MAX_SHARD_NUMBER = 10;
        MAX_ROWS = 10;
        TMP_DIR = FileUtils.createTempDir();
        PERSIST_DIR = Paths.get(TMP_DIR.getAbsolutePath(), "indexTestMerger").toFile();
        SEGMENT_SET = new ArrayList<DataSegment>(MAX_SHARD_NUMBER);
        ROW_PARSER = new MapInputRowParser((ParseSpec)new TimeAndDimsParseSpec(new TimestampSpec(TIME_COLUMN, "auto", null), DimensionsSpec.builder().setDimensions(DimensionsSpec.getDefaultSchemas((List)ImmutableList.of((Object)DIM_NAME))).setDimensionExclusions((List)ImmutableList.of((Object)DIM_FLOAT_NAME, (Object)DIM_LONG_NAME)).build()));
    }
}

