/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.incremental;

import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
import com.carrotsearch.junitbenchmarks.Clock;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexRow;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class OnheapIncrementalIndexBenchmark
extends AbstractBenchmark {
    private static AggregatorFactory[] factories;
    static final int DIMENSION_COUNT = 5;
    private final Class<? extends OnheapIncrementalIndex> incrementalIndex;

    @Parameterized.Parameters
    public static Collection<Object[]> getParameters() {
        return ImmutableList.of((Object)new Object[]{OnheapIncrementalIndex.class}, (Object)new Object[]{MapIncrementalIndex.class});
    }

    public OnheapIncrementalIndexBenchmark(Class<? extends OnheapIncrementalIndex> incrementalIndex) {
        this.incrementalIndex = incrementalIndex;
    }

    private static MapBasedInputRow getLongRow(long timestamp, int rowID, int dimensionCount) {
        ArrayList<String> dimensionList = new ArrayList<String>(dimensionCount);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (int i = 0; i < dimensionCount; ++i) {
            String dimName = StringUtils.format((String)"Dim_%d", (Object[])new Object[]{i});
            dimensionList.add(dimName);
            builder.put((Object)dimName, (Object)new Integer(rowID).longValue());
        }
        return new MapBasedInputRow(timestamp, dimensionList, (Map)builder.build());
    }

    @Ignore
    @Test
    @BenchmarkOptions(callgc=true, clock=Clock.REAL_TIME, warmupRounds=10, benchmarkRounds=20)
    public void testConcurrentAddRead() throws InterruptedException, ExecutionException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
        int taskCount = 30;
        int concurrentThreads = 3;
        int elementsPerThread = 32768;
        final IncrementalIndex incrementalIndex = (IncrementalIndex)this.incrementalIndex.getConstructor(IncrementalIndexSchema.class, Boolean.TYPE, Boolean.TYPE, Boolean.TYPE, Boolean.TYPE, Integer.TYPE).newInstance(new IncrementalIndexSchema.Builder().withMetrics(factories).build(), true, true, false, true, 983040);
        ArrayList<Object> queryAggregatorFactories = new ArrayList<Object>(6);
        queryAggregatorFactories.add(new CountAggregatorFactory("rows"));
        for (int i = 0; i < 5; ++i) {
            queryAggregatorFactories.add(new LongSumAggregatorFactory(StringUtils.format((String)"sumResult%s", (Object[])new Object[]{i}), StringUtils.format((String)"sumResult%s", (Object[])new Object[]{i})));
            queryAggregatorFactories.add(new DoubleSumAggregatorFactory(StringUtils.format((String)"doubleSumResult%s", (Object[])new Object[]{i}), StringUtils.format((String)"doubleSumResult%s", (Object[])new Object[]{i})));
        }
        ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setDaemon(false).setNameFormat("index-executor-%d").setPriority(1).build()));
        ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setDaemon(false).setNameFormat("query-executor-%d").build()));
        final long timestamp = System.currentTimeMillis();
        Interval queryInterval = Intervals.of((String)"1900-01-01T00:00:00Z/2900-01-01T00:00:00Z");
        ArrayList<ListenableFuture> indexFutures = new ArrayList<ListenableFuture>();
        ArrayList<ListenableFuture> queryFutures = new ArrayList<ListenableFuture>();
        IncrementalIndexSegment incrementalIndexSegment = new IncrementalIndexSegment(incrementalIndex, null);
        TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(new TimeseriesQueryQueryToolChest(), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER);
        final AtomicInteger currentlyRunning = new AtomicInteger(0);
        AtomicBoolean concurrentlyRan = new AtomicBoolean(false);
        final AtomicBoolean someoneRan = new AtomicBoolean(false);
        for (int j = 0; j < 30; ++j) {
            indexFutures.add(indexExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    currentlyRunning.incrementAndGet();
                    try {
                        for (int i = 0; i < 32768; ++i) {
                            incrementalIndex.add((InputRow)OnheapIncrementalIndexBenchmark.getLongRow(timestamp + (long)i, 1, 5));
                        }
                    }
                    catch (IndexSizeExceededException e) {
                        throw new RuntimeException(e);
                    }
                    currentlyRunning.decrementAndGet();
                    someoneRan.set(true);
                }
            }));
            queryFutures.add(queryExecutor.submit(new Runnable((QueryRunnerFactory)factory, (Segment)incrementalIndexSegment, queryInterval, queryAggregatorFactories, someoneRan, currentlyRunning, concurrentlyRan){
                final /* synthetic */ QueryRunnerFactory val$factory;
                final /* synthetic */ Segment val$incrementalIndexSegment;
                final /* synthetic */ Interval val$queryInterval;
                final /* synthetic */ ArrayList val$queryAggregatorFactories;
                final /* synthetic */ AtomicBoolean val$someoneRan;
                final /* synthetic */ AtomicInteger val$currentlyRunning;
                final /* synthetic */ AtomicBoolean val$concurrentlyRan;
                {
                    this.val$factory = queryRunnerFactory;
                    this.val$incrementalIndexSegment = segment;
                    this.val$queryInterval = interval;
                    this.val$queryAggregatorFactories = arrayList;
                    this.val$someoneRan = atomicBoolean;
                    this.val$currentlyRunning = atomicInteger;
                    this.val$concurrentlyRan = atomicBoolean2;
                }

                @Override
                public void run() {
                    FinalizeResultsQueryRunner runner = new FinalizeResultsQueryRunner(this.val$factory.createRunner(this.val$incrementalIndexSegment), this.val$factory.getToolchest());
                    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("xxx").granularity(Granularities.ALL).intervals((List)ImmutableList.of((Object)this.val$queryInterval)).aggregators((List)this.val$queryAggregatorFactories).build();
                    List results = runner.run(QueryPlus.wrap((Query)query)).toList();
                    for (Result result : results) {
                        if (!this.val$someoneRan.get()) continue;
                        Assert.assertTrue((((TimeseriesResultValue)result.getValue()).getDoubleMetric("doubleSumResult0") > 0.0 ? 1 : 0) != 0);
                    }
                    if (this.val$currentlyRunning.get() > 0) {
                        this.val$concurrentlyRan.set(true);
                    }
                }
            }));
        }
        ArrayList<ListenableFuture> allFutures = new ArrayList<ListenableFuture>(queryFutures.size() + indexFutures.size());
        allFutures.addAll(queryFutures);
        allFutures.addAll(indexFutures);
        Futures.allAsList(allFutures).get();
        queryExecutor.shutdown();
        indexExecutor.shutdown();
        FinalizeResultsQueryRunner runner = new FinalizeResultsQueryRunner(factory.createRunner((Segment)incrementalIndexSegment), factory.getToolchest());
        TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("xxx").granularity(Granularities.ALL).intervals((List)ImmutableList.of((Object)queryInterval)).aggregators(queryAggregatorFactories).build();
        List results = runner.run(QueryPlus.wrap((Query)query)).toList();
        int expectedVal = 983040;
        for (Result result : results) {
            Assert.assertEquals((long)32768L, (long)((TimeseriesResultValue)result.getValue()).getLongMetric("rows").intValue());
            for (int i = 0; i < 5; ++i) {
                Assert.assertEquals((String)StringUtils.format((String)"Failed long sum on dimension %d", (Object[])new Object[]{i}), (long)983040L, (long)((TimeseriesResultValue)result.getValue()).getLongMetric(StringUtils.format((String)"sumResult%s", (Object[])new Object[]{i})).intValue());
                Assert.assertEquals((String)StringUtils.format((String)"Failed double sum on dimension %d", (Object[])new Object[]{i}), (long)983040L, (long)((TimeseriesResultValue)result.getValue()).getDoubleMetric(StringUtils.format((String)"doubleSumResult%s", (Object[])new Object[]{i})).intValue());
            }
        }
    }

    static {
        ArrayList<Object> ingestAggregatorFactories = new ArrayList<Object>(6);
        ingestAggregatorFactories.add(new CountAggregatorFactory("rows"));
        for (int i = 0; i < 5; ++i) {
            ingestAggregatorFactories.add(new LongSumAggregatorFactory(StringUtils.format((String)"sumResult%s", (Object[])new Object[]{i}), StringUtils.format((String)"Dim_%s", (Object[])new Object[]{i})));
            ingestAggregatorFactories.add(new DoubleSumAggregatorFactory(StringUtils.format((String)"doubleSumResult%s", (Object[])new Object[]{i}), StringUtils.format((String)"Dim_%s", (Object[])new Object[]{i})));
        }
        factories = ingestAggregatorFactories.toArray(new AggregatorFactory[0]);
    }

    private static final class MapIncrementalIndex
    extends OnheapIncrementalIndex {
        private final AtomicInteger indexIncrement = new AtomicInteger(0);
        ConcurrentHashMap<Integer, Aggregator[]> indexedMap = new ConcurrentHashMap();

        public MapIncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean concurrentEventAdd, boolean sortFacts, int maxRowCount, long maxBytesInMemory) {
            super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, sortFacts, maxRowCount, maxBytesInMemory);
        }

        public MapIncrementalIndex(long minTimestamp, Granularity gran, AggregatorFactory[] metrics, int maxRowCount, long maxBytesInMemory) {
            super(new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp).withQueryGranularity(gran).withMetrics(metrics).build(), true, false, true, maxRowCount, maxBytesInMemory);
        }

        protected Aggregator[] concurrentGet(int offset) {
            return this.indexedMap.get(offset);
        }

        protected void concurrentSet(int offset, Aggregator[] value) {
            this.indexedMap.put(offset, value);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected IncrementalIndex.AddToFactsResult addToFacts(InputRow row, IncrementalIndexRow key, ThreadLocal<InputRow> rowContainer, Supplier<InputRow> rowSupplier, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException {
            Aggregator[] aggs;
            Integer priorIdex = this.getFacts().getPriorIndex(key);
            AggregatorFactory[] metrics = this.getMetrics();
            AtomicInteger numEntries = this.getNumEntries();
            AtomicLong sizeInBytes = this.getBytesInMemory();
            if (null != priorIdex) {
                aggs = this.indexedMap.get(priorIdex);
            } else {
                Integer rowIndex;
                aggs = new Aggregator[metrics.length];
                for (int i = 0; i < metrics.length; ++i) {
                    AggregatorFactory agg = metrics[i];
                    aggs[i] = agg.factorize(this.makeColumnSelectorFactory(agg, rowSupplier, this.getDeserializeComplexMetrics()));
                }
                while (null != this.indexedMap.putIfAbsent(rowIndex = Integer.valueOf(this.indexIncrement.incrementAndGet()), aggs)) {
                }
                if ((numEntries.get() >= this.maxRowCount || sizeInBytes.get() >= this.maxBytesInMemory) && this.getFacts().getPriorIndex(key) == -1) {
                    throw new IndexSizeExceededException("Maximum number of rows or max bytes reached", new Object[0]);
                }
                int prev = this.getFacts().putIfAbsent(key, rowIndex.intValue());
                if (-1 == prev) {
                    numEntries.incrementAndGet();
                    sizeInBytes.incrementAndGet();
                } else {
                    aggs = this.indexedMap.get(prev);
                    this.indexedMap.remove(rowIndex);
                }
            }
            rowContainer.set(row);
            Aggregator[] aggregatorArray = aggs;
            int n = aggregatorArray.length;
            for (int i = 0; i < n; ++i) {
                Aggregator agg;
                Aggregator aggregator = agg = aggregatorArray[i];
                synchronized (aggregator) {
                    agg.aggregate();
                    continue;
                }
            }
            rowContainer.set(null);
            return new IncrementalIndex.AddToFactsResult(numEntries.get(), sizeInBytes.get(), new ArrayList());
        }

        public int getLastRowIndex() {
            return this.indexIncrement.get() - 1;
        }
    }
}

