/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.query.groupby;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.druid.collections.CloseableDefaultBlockingPool;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.TestHelper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class GroupByQueryRunnerFailureTest {
    private QueryProcessingPool processingPool;
    private static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig(){

        public String getFormatString() {
            return null;
        }

        public int intermediateComputeSizeBytes() {
            return 0xA00000;
        }

        public int getNumMergeBuffers() {
            return 1;
        }

        public int getNumThreads() {
            return 2;
        }
    };
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private static final CloseableStupidPool<ByteBuffer> BUFFER_POOL = new CloseableStupidPool("GroupByQueryEngine-bufferPool", () -> ByteBuffer.allocate(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()));
    private static final CloseableDefaultBlockingPool<ByteBuffer> MERGE_BUFFER_POOL = new CloseableDefaultBlockingPool(() -> ByteBuffer.allocate(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()), DEFAULT_PROCESSING_CONFIG.getNumMergeBuffers());
    private static final GroupByQueryRunnerFactory FACTORY = GroupByQueryRunnerFailureTest.makeQueryRunnerFactory(GroupByQueryRunnerTest.DEFAULT_MAPPER, new GroupByQueryConfig(){});
    private QueryRunner<ResultRow> runner;

    private static GroupByQueryRunnerFactory makeQueryRunnerFactory(ObjectMapper mapper, GroupByQueryConfig config) {
        Supplier configSupplier = Suppliers.ofInstance((Object)config);
        GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider();
        GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(MERGE_BUFFER_POOL, config);
        GroupingEngine groupingEngine = new GroupingEngine(DEFAULT_PROCESSING_CONFIG, configSupplier, groupByResourcesReservationPool, TestHelper.makeJsonMapper(), mapper, QueryRunnerTestHelper.NOOP_QUERYWATCHER, groupByStatsProvider);
        GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool);
        return new GroupByQueryRunnerFactory(groupingEngine, toolChest, BUFFER_POOL);
    }

    @Before
    public void setUp() {
        Assert.assertEquals((String)"MERGE_BUFFER_POOL size, pre-test", (long)MERGE_BUFFER_POOL.maxSize(), (long)MERGE_BUFFER_POOL.getPoolSize());
        this.processingPool = new ForwardingQueryProcessingPool(Execs.multiThreaded((int)2, (String)"GroupByQueryRunnerFailureTestExecutor-%d"), Execs.scheduledSingleThreaded((String)"GroupByQueryRunnerFailureTestExecutor-Timeout-%d"));
    }

    @After
    public void tearDown() {
        Assert.assertEquals((String)"MERGE_BUFFER_POOL size, post-test", (long)MERGE_BUFFER_POOL.maxSize(), (long)MERGE_BUFFER_POOL.getPoolSize());
        this.processingPool.shutdown();
    }

    @AfterClass
    public static void teardownClass() {
        BUFFER_POOL.close();
        MERGE_BUFFER_POOL.close();
    }

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> constructorFeeder() {
        ArrayList<Object[]> args = new ArrayList<Object[]>();
        for (QueryRunner queryRunner : QueryRunnerTestHelper.makeQueryRunners(FACTORY, true)) {
            args.add(new Object[]{queryRunner});
        }
        return args;
    }

    public GroupByQueryRunnerFailureTest(QueryRunner<ResultRow> runner) {
        this.runner = FACTORY.mergeRunners((ExecutorService)Execs.directExecutor(), (Iterable)ImmutableList.of(runner));
    }

    @Test(timeout=60000L)
    public void testNotEnoughMergeBuffersOnQueryable() {
        this.expectedException.expect(ResourceLimitExceededException.class);
        this.expectedException.expectMessage("Query needs 2 merge buffers, but only 1 merge buffers were configured");
        GroupByQuery query = GroupByQuery.builder().setDataSource((DataSource)new QueryDataSource((Query)GroupByQuery.builder().setDataSource("testing").setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD).setGranularity(Granularities.ALL).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")}).setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT)).build())).setGranularity(Granularities.ALL).setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD).setAggregatorSpecs(new AggregatorFactory[]{new LongSumAggregatorFactory("rows", "rows")}).setContext((Map)ImmutableMap.of((Object)"timeout", (Object)500)).build();
        GroupByQueryRunnerTestHelper.runQuery((QueryRunnerFactory)FACTORY, this.runner, query);
    }

    @Test(timeout=60000L)
    public void testResourceLimitExceededOnBroker() {
        this.expectedException.expect(ResourceLimitExceededException.class);
        GroupByQuery query = GroupByQuery.builder().setDataSource((DataSource)new QueryDataSource((Query)GroupByQuery.builder().setDataSource((Query)GroupByQuery.builder().setDataSource("testing").setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD).setGranularity(Granularities.ALL).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias"), new DefaultDimensionSpec("market", null)}).setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT)).build()).setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD).setGranularity(Granularities.ALL).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")}).setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT)).build())).setGranularity(Granularities.ALL).setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD).setAggregatorSpecs(new AggregatorFactory[]{new LongSumAggregatorFactory("rows", "rows")}).setContext((Map)ImmutableMap.of((Object)"timeout", (Object)500)).build();
        GroupByQueryRunnerTestHelper.runQuery((QueryRunnerFactory)FACTORY, this.runner, query);
    }

    @Test(timeout=60000L)
    public void testInsufficientResourcesOnBroker() {
        GroupByQuery query = GroupByQuery.builder().setDataSource((DataSource)new QueryDataSource((Query)GroupByQuery.builder().setDataSource("testing").setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD).setGranularity(Granularities.ALL).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")}).setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT)).build())).setGranularity(Granularities.ALL).setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD).setAggregatorSpecs(new AggregatorFactory[]{new LongSumAggregatorFactory("rows", "rows")}).setContext((Map)ImmutableMap.of((Object)"timeout", (Object)500)).build();
        List holder = null;
        try {
            holder = MERGE_BUFFER_POOL.takeBatch(1, 10L);
            this.expectedException.expect(ResourceLimitExceededException.class);
            this.expectedException.expectMessage("Query needs 2 merge buffers, but only 1 merge buffers were configured");
            GroupByQueryRunnerTestHelper.runQuery((QueryRunnerFactory)FACTORY, this.runner, query);
        }
        finally {
            if (holder != null) {
                holder.forEach(ReferenceCountingResourceHolder::close);
            }
        }
    }

    @Test(timeout=60000L)
    public void testTimeoutExceptionOnQueryable_singleThreaded() {
        GroupByQuery query = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")}).setAggregatorSpecs(new AggregatorFactory[]{new LongSumAggregatorFactory("rows", "rows")}).setGranularity(Granularities.ALL).overrideContext((Map)ImmutableMap.of((Object)"timeout", (Object)1)).queryId("test").build();
        GroupByQueryRunnerFactory factory = GroupByQueryRunnerFailureTest.makeQueryRunnerFactory(GroupByQueryRunnerTest.DEFAULT_MAPPER, new GroupByQueryConfig(){

            public boolean isSingleThreaded() {
                return true;
            }
        });
        QueryRunner mockRunner = (queryPlus, responseContext) -> {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return Sequences.empty();
        };
        QueryRunner mergeRunners = factory.mergeRunners((ExecutorService)Execs.directExecutor(), (Iterable)ImmutableList.of(this.runner, (Object)mockRunner));
        QueryTimeoutException ex = (QueryTimeoutException)Assert.assertThrows(QueryTimeoutException.class, () -> GroupByQueryRunnerTestHelper.runQuery((QueryRunnerFactory)factory, mergeRunners, query));
        Assert.assertEquals((Object)"Query [test] timed out", (Object)ex.getMessage());
    }

    @Test(timeout=60000L)
    public void testTimeoutExceptionOnQueryable_multiThreaded() {
        GroupByQuery query = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")}).setAggregatorSpecs(new AggregatorFactory[]{new LongSumAggregatorFactory("rows", "rows")}).setGranularity(Granularities.ALL).overrideContext(Map.of("timeout", 1)).queryId("test").build();
        GroupByQueryRunnerFactory factory = GroupByQueryRunnerFailureTest.makeQueryRunnerFactory(GroupByQueryRunnerTest.DEFAULT_MAPPER, new GroupByQueryConfig(){

            public boolean isSingleThreaded() {
                return true;
            }
        });
        QueryRunner mockRunner = (queryPlus, responseContext) -> {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return Sequences.empty();
        };
        QueryRunner mergeRunners = factory.mergeRunners((ExecutorService)Execs.directExecutor(), List.of(this.runner, mockRunner));
        QueryTimeoutException ex = (QueryTimeoutException)Assert.assertThrows(QueryTimeoutException.class, () -> GroupByQueryRunnerTestHelper.runQuery((QueryRunnerFactory)factory, mergeRunners, query));
        Assert.assertEquals((Object)"Query [test] timed out", (Object)ex.getMessage());
    }

    @Test(timeout=20000L)
    public void test_multiThreaded_perSegmentTimeout_causes_queryTimeout() {
        GroupByQuery query = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")}).setAggregatorSpecs(new AggregatorFactory[]{new LongSumAggregatorFactory("rows", "rows")}).setGranularity(Granularities.ALL).overrideContext(Map.of("timeout", 300000, "perSegmentTimeout", 100)).queryId("test").build();
        GroupByQueryRunnerFactory factory = GroupByQueryRunnerFailureTest.makeQueryRunnerFactory(GroupByQueryRunnerTest.DEFAULT_MAPPER, new GroupByQueryConfig(){

            public boolean isSingleThreaded() {
                return false;
            }
        });
        QueryRunner mockRunner = (queryPlus, responseContext) -> {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return Sequences.empty();
        };
        QueryRunner mergeRunners = factory.mergeRunners(this.processingPool, List.of(this.runner, mockRunner));
        QueryTimeoutException ex = (QueryTimeoutException)Assert.assertThrows(QueryTimeoutException.class, () -> GroupByQueryRunnerTestHelper.runQuery((QueryRunnerFactory)factory, mergeRunners, query));
        Assert.assertEquals((Object)"Query timeout, cancelling pending results for query [test]. Per-segment timeout exceeded.", (Object)ex.getMessage());
    }

    @Test(timeout=20000L)
    public void test_singleThreaded_perSegmentTimeout_causes_queryTimeout() {
        GroupByQuery query = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")}).setAggregatorSpecs(new AggregatorFactory[]{new LongSumAggregatorFactory("rows", "rows")}).setGranularity(Granularities.ALL).overrideContext(Map.of("timeout", 300000, "perSegmentTimeout", 100)).queryId("test").build();
        GroupByQueryRunnerFactory factory = GroupByQueryRunnerFailureTest.makeQueryRunnerFactory(GroupByQueryRunnerTest.DEFAULT_MAPPER, new GroupByQueryConfig(){

            public boolean isSingleThreaded() {
                return true;
            }
        });
        QueryRunner mockRunner = (queryPlus, responseContext) -> {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return Sequences.empty();
        };
        QueryRunner mergeRunners = factory.mergeRunners(this.processingPool, List.of(this.runner, mockRunner));
        QueryTimeoutException ex = (QueryTimeoutException)Assert.assertThrows(QueryTimeoutException.class, () -> GroupByQueryRunnerTestHelper.runQuery((QueryRunnerFactory)factory, mergeRunners, query));
        Assert.assertEquals((Object)"Query timeout, cancelling pending results for query [test]. Per-segment timeout exceeded.", (Object)ex.getMessage());
    }

    @Test(timeout=5000L)
    public void test_perSegmentTimeout_crossQuery() throws Exception {
        GroupByQueryRunnerFactory factory = GroupByQueryRunnerFailureTest.makeQueryRunnerFactory(GroupByQueryRunnerTest.DEFAULT_MAPPER, new GroupByQueryConfig(){

            public boolean isSingleThreaded() {
                return false;
            }
        });
        GroupByQuery slowQuery = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")}).setAggregatorSpecs(new AggregatorFactory[]{new LongSumAggregatorFactory("rows", "rows")}).setGranularity(Granularities.ALL).overrideContext(Map.of("timeout", 300000, "perSegmentTimeout", 1000)).queryId("slow").build();
        GroupByQuery fastQuery = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD).setDimensions(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")}).setAggregatorSpecs(new AggregatorFactory[]{new LongSumAggregatorFactory("rows", "rows")}).setGranularity(Granularities.ALL).overrideContext(Map.of("timeout", 5000, "perSegmentTimeout", 100)).queryId("fast").build();
        CountDownLatch slowStart = new CountDownLatch(2);
        CountDownLatch fastStart = new CountDownLatch(1);
        QueryRunner signalingSlowRunner = (queryPlus, responseContext) -> {
            slowStart.countDown();
            try {
                Thread.sleep(60000L);
            }
            catch (InterruptedException e) {
                throw new QueryInterruptedException((Throwable)e);
            }
            return Sequences.empty();
        };
        QueryRunner fastRunner = (queryPlus, responseContext) -> {
            fastStart.countDown();
            return Sequences.empty();
        };
        AtomicReference thrown = new AtomicReference();
        Thread slowQueryThread = new Thread(() -> {
            try {
                GroupByQueryRunnerTestHelper.runQuery((QueryRunnerFactory)factory, factory.mergeRunners(this.processingPool, List.of(signalingSlowRunner, signalingSlowRunner)), slowQuery);
            }
            catch (QueryTimeoutException e) {
                thrown.set(e);
                return;
            }
            Assert.fail((String)"Expected QueryTimeoutException for slow query");
        });
        slowQueryThread.start();
        slowStart.await();
        Thread fastQueryThread = new Thread(() -> {
            try {
                GroupByQueryRunnerTestHelper.runQuery((QueryRunnerFactory)factory, factory.mergeRunners(this.processingPool, List.of(fastRunner)), fastQuery);
            }
            catch (Exception e) {
                Assert.fail((String)"Expected fast query to succeed");
            }
        });
        fastQueryThread.start();
        boolean fastStartedEarly = fastStart.await(500L, TimeUnit.MILLISECONDS);
        Assert.assertFalse((String)"Fast query should be blocked and not started while slow queries are running", (boolean)fastStartedEarly);
        fastQueryThread.join();
        slowQueryThread.join();
        Assert.assertEquals((Object)"Query timeout, cancelling pending results for query [slow]. Per-segment timeout exceeded.", (Object)((Throwable)thrown.get()).getMessage());
    }
}

