/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.client;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.primitives.Bytes;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.RandomUtils;
import org.apache.druid.client.CacheUtil;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.BackgroundCachePopulator;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.CacheStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNResultValue;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class CachingQueryRunnerTest {
    private static final List<AggregatorFactory> AGGS = Arrays.asList(new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("imps", "imps"), new LongSumAggregatorFactory("impers", "imps"));
    private static final Object[] OBJECTS = new Object[]{DateTimes.of((String)"2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, DateTimes.of((String)"2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, DateTimes.of((String)"2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, DateTimes.of((String)"2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, DateTimes.of((String)"2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983};
    private static final SegmentDescriptor SEGMENT_DESCRIPTOR = new SegmentDescriptor(Intervals.of((String)"2011/2012"), "version", 0);
    private static final String CACHE_ID = "segment";
    private ObjectMapper objectMapper = new DefaultObjectMapper();
    private CachePopulator cachePopulator;

    @Parameterized.Parameters(name="numBackgroundThreads={0}")
    public static Iterable<Object[]> constructorFeeder() {
        return QueryRunnerTestHelper.cartesian((Iterable[])new Iterable[]{Arrays.asList(5, 1, 0)});
    }

    public CachingQueryRunnerTest(int numBackgroundThreads) {
        this.cachePopulator = numBackgroundThreads > 0 ? new BackgroundCachePopulator(Execs.multiThreaded((int)numBackgroundThreads, (String)"CachingQueryRunnerTest-%d"), this.objectMapper, new CachePopulatorStats(), -1L) : new ForegroundCachePopulator(this.objectMapper, new CachePopulatorStats(), -1L);
    }

    @Test
    public void testCloseAndPopulate() throws Exception {
        List<Result> expectedRes = this.makeTopNResults(false, OBJECTS);
        List<Result> expectedCacheRes = this.makeTopNResults(true, OBJECTS);
        TopNQueryBuilder builder = new TopNQueryBuilder().dataSource("ds").dimension("top_dim").metric("imps").threshold(3).intervals("2011-01-05/2011-01-10").aggregators(AGGS).granularity(Granularities.ALL);
        TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
        this.testCloseAndPopulate(expectedRes, expectedCacheRes, (Query)builder.build(), (QueryToolChest)toolchest);
        this.testUseCache(expectedCacheRes, (Query)builder.build(), (QueryToolChest)toolchest);
    }

    @Test
    public void testTimeseries() throws Exception {
        for (boolean descending : new boolean[]{false, true}) {
            TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("testing").granularity(QueryRunnerTestHelper.DAY_GRAN).intervals(QueryRunnerTestHelper.FIRST_TO_THIRD).aggregators(Arrays.asList(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"), QueryRunnerTestHelper.QUALITY_UNIQUES)).descending(descending).build();
            Result row1 = new Result(DateTimes.of((String)"2011-04-01"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"rows", (Object)13L, (Object)"idx", (Object)6619L, (Object)"uniques", (Object)9.019833517963864)));
            Result row2 = new Result(DateTimes.of((String)"2011-04-02"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"rows", (Object)13L, (Object)"idx", (Object)5827L, (Object)"uniques", (Object)9.019833517963864)));
            ArrayList expectedResults = descending ? Lists.newArrayList((Object[])new Result[]{row2, row1}) : Lists.newArrayList((Object[])new Result[]{row1, row2});
            TimeseriesQueryQueryToolChest toolChest = new TimeseriesQueryQueryToolChest();
            this.testCloseAndPopulate(expectedResults, expectedResults, (Query)query, (QueryToolChest)toolChest);
            this.testUseCache(expectedResults, (Query)query, (QueryToolChest)toolChest);
        }
    }

    @Test
    public void testNullCacheKeyPrefix() {
        TopNQuery query = new TopNQueryBuilder().dataSource("ds").dimension("top_dim").metric("imps").threshold(3).intervals("2011-01-05/2011-01-10").aggregators(AGGS).granularity(Granularities.ALL).build();
        TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
        Cache cache = (Cache)EasyMock.mock(Cache.class);
        EasyMock.replay((Object[])new Object[]{cache});
        CachingQueryRunner queryRunner = this.makeCachingQueryRunner(null, cache, (QueryToolChest)toolchest, (Sequence<Object>)Sequences.empty());
        Assert.assertFalse((boolean)queryRunner.canPopulateCache((Query)query, toolchest.getCacheStrategy((Query)query)));
        Assert.assertFalse((boolean)queryRunner.canUseCache((Query)query, toolchest.getCacheStrategy((Query)query)));
        queryRunner.run(QueryPlus.wrap((Query)query));
        EasyMock.verifyUnexpectedCalls((Object[])new Object[]{cache});
    }

    @Test
    public void testNullStrategy() {
        TopNQuery query = new TopNQueryBuilder().dataSource("ds").dimension("top_dim").metric("imps").threshold(3).intervals("2011-01-05/2011-01-10").aggregators(AGGS).granularity(Granularities.ALL).build();
        QueryToolChest toolchest = (QueryToolChest)EasyMock.mock(QueryToolChest.class);
        Cache cache = (Cache)EasyMock.mock(Cache.class);
        EasyMock.expect((Object)toolchest.getCacheStrategy((Query)query)).andReturn(null);
        EasyMock.replay((Object[])new Object[]{cache, toolchest});
        CachingQueryRunner queryRunner = this.makeCachingQueryRunner(new byte[0], cache, toolchest, (Sequence<Object>)Sequences.empty());
        Assert.assertFalse((boolean)queryRunner.canPopulateCache((Query)query, null));
        Assert.assertFalse((boolean)queryRunner.canUseCache((Query)query, null));
        queryRunner.run(QueryPlus.wrap((Query)query));
        EasyMock.verifyUnexpectedCalls((Object[])new Object[]{cache});
    }

    private void testCloseAndPopulate(List<Result> expectedRes, List<Result> expectedCacheRes, Query query, QueryToolChest toolchest) throws Exception {
        final AssertingClosable closable = new AssertingClosable();
        Sequence resultSeq = Sequences.wrap((Sequence)Sequences.simple(expectedRes), (SequenceWrapper)new SequenceWrapper(){

            public void before() {
                Assert.assertFalse((boolean)closable.isClosed());
            }

            public void after(boolean isDone, Throwable thrown) {
                closable.close();
            }
        });
        final CountDownLatch cacheMustBePutOnce = new CountDownLatch(1);
        Cache cache = new Cache(){
            private final ConcurrentMap<Cache.NamedKey, byte[]> baseMap = new ConcurrentHashMap<Cache.NamedKey, byte[]>();

            public byte[] get(Cache.NamedKey key) {
                return (byte[])this.baseMap.get(key);
            }

            public void put(Cache.NamedKey key, byte[] value) {
                this.baseMap.put(key, value);
                cacheMustBePutOnce.countDown();
            }

            public Map<Cache.NamedKey, byte[]> getBulk(Iterable<Cache.NamedKey> keys) {
                return null;
            }

            public void close(String namespace) {
            }

            public void close() {
            }

            public CacheStats getStats() {
                return null;
            }

            public boolean isLocal() {
                return true;
            }

            public void doMonitor(ServiceEmitter emitter) {
            }
        };
        byte[] keyPrefix = RandomUtils.nextBytes((int)10);
        CachingQueryRunner runner = this.makeCachingQueryRunner(keyPrefix, cache, toolchest, (Sequence<Object>)resultSeq);
        CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
        Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey((String)CACHE_ID, (SegmentDescriptor)SEGMENT_DESCRIPTOR, (byte[])Bytes.concat((byte[][])new byte[][]{keyPrefix, cacheStrategy.computeCacheKey(query)}));
        Assert.assertTrue((boolean)runner.canPopulateCache(query, cacheStrategy));
        Sequence res = runner.run(QueryPlus.wrap((Query)query));
        Assert.assertFalse((String)"sequence must not be closed", (boolean)closable.isClosed());
        Assert.assertNull((String)"cache must be empty", (Object)cache.get(cacheKey));
        List results = res.toList();
        Assert.assertTrue((boolean)closable.isClosed());
        Assert.assertEquals((Object)expectedRes.toString(), (Object)results.toString());
        Assert.assertTrue((String)"cache must be populated", (boolean)cacheMustBePutOnce.await(10L, TimeUnit.SECONDS));
        byte[] cacheValue = cache.get(cacheKey);
        Assert.assertNotNull((Object)cacheValue);
        Function fn = cacheStrategy.pullFromSegmentLevelCache();
        ArrayList cacheResults = Lists.newArrayList((Iterator)Iterators.transform((Iterator)this.objectMapper.readValues(this.objectMapper.getFactory().createParser(cacheValue), cacheStrategy.getCacheObjectClazz()), (Function)fn));
        Assert.assertEquals((Object)expectedCacheRes.toString(), (Object)((Object)cacheResults).toString());
    }

    private void testUseCache(List<Result> expectedResults, Query query, QueryToolChest toolchest) throws IOException {
        byte[] cacheKeyPrefix = RandomUtils.nextBytes((int)10);
        CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
        Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey((String)CACHE_ID, (SegmentDescriptor)SEGMENT_DESCRIPTOR, (byte[])Bytes.concat((byte[][])new byte[][]{cacheKeyPrefix, cacheStrategy.computeCacheKey(query)}));
        Cache cache = MapCache.create((long)0x100000L);
        cache.put(cacheKey, this.toByteArray(Iterables.transform(expectedResults, (Function)cacheStrategy.prepareForSegmentLevelCache())));
        CachingQueryRunner runner = this.makeCachingQueryRunner(cacheKeyPrefix, cache, toolchest, (Sequence<Object>)Sequences.empty());
        Assert.assertTrue((boolean)runner.canUseCache(query, toolchest.getCacheStrategy(query)));
        List results = runner.run(QueryPlus.wrap((Query)query)).toList();
        Assert.assertEquals((Object)expectedResults.toString(), (Object)results.toString());
    }

    private CachingQueryRunner makeCachingQueryRunner(byte[] cacheKeyPrefix, Cache cache, QueryToolChest toolchest, final Sequence<Object> results) {
        return new CachingQueryRunner(CACHE_ID, Optional.ofNullable(cacheKeyPrefix), SEGMENT_DESCRIPTOR, SEGMENT_DESCRIPTOR.getInterval(), this.objectMapper, cache, toolchest, new QueryRunner(){

            public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) {
                return results;
            }
        }, this.cachePopulator, new CacheConfig(){

            public boolean isPopulateCache() {
                return true;
            }

            public boolean isUseCache() {
                return true;
            }
        });
    }

    private List<Result> makeTopNResults(boolean cachedResults, Object ... objects) {
        ArrayList<Result> retVal = new ArrayList<Result>();
        int index = 0;
        while (index < objects.length) {
            DateTime timestamp = (DateTime)objects[index++];
            ArrayList<ImmutableMap> values = new ArrayList<ImmutableMap>();
            while (index < objects.length && !(objects[index] instanceof DateTime)) {
                if (objects.length - index < 3) {
                    throw new ISE("expect 3 values for each entry in the top list, had %d values left.", new Object[]{objects.length - index});
                }
                double imps = ((Number)objects[index + 2]).doubleValue();
                double rows = ((Number)objects[index + 1]).doubleValue();
                if (cachedResults) {
                    values.add(ImmutableMap.of((Object)"top_dim", (Object)objects[index], (Object)"rows", (Object)rows, (Object)"imps", (Object)imps, (Object)"impers", (Object)imps));
                } else {
                    values.add(ImmutableMap.of((Object)"top_dim", (Object)objects[index], (Object)"rows", (Object)rows, (Object)"imps", (Object)imps, (Object)"impers", (Object)imps, (Object)"avg_imps_per_row", (Object)(imps / rows)));
                }
                index += 3;
            }
            retVal.add(new Result(timestamp, (Object)new TopNResultValue(values)));
        }
        return retVal;
    }

    private <T> byte[] toByteArray(Iterable<T> results) throws IOException {
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        try (JsonGenerator gen = this.objectMapper.getFactory().createGenerator((OutputStream)bytes);){
            SerializerProvider serializers = this.objectMapper.getSerializerProviderInstance();
            for (T result : results) {
                JacksonUtils.writeObjectUsingSerializerProvider((JsonGenerator)gen, (SerializerProvider)serializers, result);
            }
        }
        return bytes.toByteArray();
    }

    private static class AssertingClosable
    implements Closeable {
        private final AtomicBoolean closed = new AtomicBoolean(false);

        private AssertingClosable() {
        }

        @Override
        public void close() {
            Assert.assertFalse((boolean)this.closed.get());
            Assert.assertTrue((boolean)this.closed.compareAndSet(false, true));
        }

        public boolean isClosed() {
            return this.closed.get();
        }
    }
}

