/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.query.groupby.epinephelinae;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.AggregateResult;
import org.apache.druid.query.groupby.epinephelinae.CloseableGrouperIterator;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;
import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper;

public class GroupByMergingQueryRunnerV2
implements QueryRunner<ResultRow> {
    private static final Logger log = new Logger(GroupByMergingQueryRunnerV2.class);
    private static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution";
    private final GroupByQueryConfig config;
    private final DruidProcessingConfig processingConfig;
    private final Iterable<QueryRunner<ResultRow>> queryables;
    private final QueryProcessingPool queryProcessingPool;
    private final QueryWatcher queryWatcher;
    private final int concurrencyHint;
    private final BlockingPool<ByteBuffer> mergeBufferPool;
    private final ObjectMapper spillMapper;
    private final String processingTmpDir;
    private final int mergeBufferSize;

    public GroupByMergingQueryRunnerV2(GroupByQueryConfig config, DruidProcessingConfig processingConfig, QueryProcessingPool queryProcessingPool, QueryWatcher queryWatcher, Iterable<QueryRunner<ResultRow>> queryables, int concurrencyHint, BlockingPool<ByteBuffer> mergeBufferPool, int mergeBufferSize, ObjectMapper spillMapper, String processingTmpDir) {
        this.config = config;
        this.processingConfig = processingConfig;
        this.queryProcessingPool = queryProcessingPool;
        this.queryWatcher = queryWatcher;
        this.queryables = Iterables.unmodifiableIterable((Iterable)Iterables.filter(queryables, (Predicate)Predicates.notNull()));
        this.concurrencyHint = concurrencyHint;
        this.mergeBufferPool = mergeBufferPool;
        this.spillMapper = spillMapper;
        this.processingTmpDir = processingTmpDir;
        this.mergeBufferSize = mergeBufferSize;
    }

    @Override
    public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, final ResponseContext responseContext) {
        final GroupByQuery query = (GroupByQuery)queryPlus.getQuery();
        final GroupByQueryConfig querySpecificConfig = this.config.withOverrides(query);
        boolean forceChainedExecution = query.context().getBoolean(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, false);
        final QueryPlus queryPlusForRunners = queryPlus.withQuery(query.withOverriddenContext((Map)ImmutableMap.of((Object)CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, (Object)true))).withoutThreadUnsafeState();
        QueryContext queryContext = query.context();
        if (queryContext.isBySegment() || forceChainedExecution) {
            ChainedExecutionQueryRunner runner = new ChainedExecutionQueryRunner(this.queryProcessingPool, this.queryWatcher, this.queryables);
            return runner.run(queryPlusForRunners, responseContext);
        }
        final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded();
        final File temporaryStorageDirectory = new File(this.processingTmpDir, StringUtils.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()));
        final int priority = queryContext.getPriority();
        long queryTimeout = queryContext.getTimeout();
        final boolean hasTimeout = queryContext.hasTimeout();
        final long timeoutAt = System.currentTimeMillis() + queryTimeout;
        return new BaseSequence<ResultRow, CloseableGrouperIterator<RowBasedGrouperHelper.RowBasedKey, ResultRow>>(new BaseSequence.IteratorMaker<ResultRow, CloseableGrouperIterator<RowBasedGrouperHelper.RowBasedKey, ResultRow>>(){

            @Override
            public CloseableGrouperIterator<RowBasedGrouperHelper.RowBasedKey, ResultRow> make() {
                Closer resources = Closer.create();
                try {
                    LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(temporaryStorageDirectory, querySpecificConfig.getMaxOnDiskStorage().getBytes());
                    ReferenceCountingResourceHolder<LimitedTemporaryStorage> temporaryStorageHolder = ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
                    resources.register(temporaryStorageHolder);
                    int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1;
                    List mergeBufferHolders = GroupByMergingQueryRunnerV2.this.getMergeBuffersHolder(numMergeBuffers, hasTimeout, timeoutAt);
                    resources.registerAll(mergeBufferHolders);
                    final ReferenceCountingResourceHolder mergeBufferHolder = (ReferenceCountingResourceHolder)mergeBufferHolders.get(0);
                    ReferenceCountingResourceHolder combineBufferHolder = numMergeBuffers == 2 ? (ReferenceCountingResourceHolder)mergeBufferHolders.get(1) : null;
                    Pair<Grouper<RowBasedGrouperHelper.RowBasedKey>, Accumulator<AggregateResult, ResultRow>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(query, null, GroupByMergingQueryRunnerV2.this.config, GroupByMergingQueryRunnerV2.this.processingConfig, (Supplier<ByteBuffer>)Suppliers.ofInstance((Object)((ByteBuffer)mergeBufferHolder.get())), combineBufferHolder, GroupByMergingQueryRunnerV2.this.concurrencyHint, temporaryStorage, GroupByMergingQueryRunnerV2.this.spillMapper, GroupByMergingQueryRunnerV2.this.queryProcessingPool, priority, hasTimeout, timeoutAt, GroupByMergingQueryRunnerV2.this.mergeBufferSize);
                    Grouper grouper = (Grouper)pair.lhs;
                    final Accumulator accumulator = (Accumulator)pair.rhs;
                    grouper.init();
                    final ReferenceCountingResourceHolder<Grouper> grouperHolder = ReferenceCountingResourceHolder.fromCloseable(grouper);
                    resources.register(grouperHolder);
                    ArrayList futures = Lists.newArrayList((Iterable)Iterables.transform((Iterable)GroupByMergingQueryRunnerV2.this.queryables, (Function)new Function<QueryRunner<ResultRow>, ListenableFuture<AggregateResult>>(){

                        public ListenableFuture<AggregateResult> apply(final QueryRunner<ResultRow> input) {
                            if (input == null) {
                                throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening", new Object[0]);
                            }
                            ListenableFuture<AggregateResult> future = GroupByMergingQueryRunnerV2.this.queryProcessingPool.submitRunnerTask(new AbstractPrioritizedQueryRunnerCallable<AggregateResult, ResultRow>(priority, input){

                                /*
                                 * Enabled aggressive exception aggregation
                                 */
                                @Override
                                public AggregateResult call() {
                                    try (ResourceHolder bufferReleaser = mergeBufferHolder.increment();){
                                        AggregateResult aggregateResult;
                                        block15: {
                                            ResourceHolder grouperReleaser = grouperHolder.increment();
                                            try {
                                                aggregateResult = input.run(queryPlusForRunners, responseContext).accumulate(AggregateResult.ok(), accumulator);
                                                if (grouperReleaser == null) break block15;
                                            }
                                            catch (Throwable throwable) {
                                                if (grouperReleaser != null) {
                                                    try {
                                                        grouperReleaser.close();
                                                    }
                                                    catch (Throwable throwable2) {
                                                        throwable.addSuppressed(throwable2);
                                                    }
                                                }
                                                throw throwable;
                                            }
                                            grouperReleaser.close();
                                        }
                                        return aggregateResult;
                                    }
                                    catch (QueryInterruptedException | QueryTimeoutException e) {
                                        throw e;
                                    }
                                    catch (Exception e) {
                                        log.error(e, "Exception with one of the sequences!", new Object[0]);
                                        Throwables.propagateIfPossible((Throwable)e);
                                        throw new RuntimeException(e);
                                    }
                                }
                            });
                            if (isSingleThreaded) {
                                GroupByMergingQueryRunnerV2.this.waitForFutureCompletion(query, (List)ImmutableList.of(future), hasTimeout, timeoutAt - System.currentTimeMillis());
                            }
                            return future;
                        }
                    }));
                    if (!isSingleThreaded) {
                        GroupByMergingQueryRunnerV2.this.waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis());
                    }
                    return RowBasedGrouperHelper.makeGrouperIterator(grouper, query, resources);
                }
                catch (Throwable t) {
                    try {
                        resources.close();
                    }
                    catch (Exception ex) {
                        t.addSuppressed(ex);
                    }
                    throw t;
                }
            }

            @Override
            public void cleanup(CloseableGrouperIterator<RowBasedGrouperHelper.RowBasedKey, ResultRow> iterFromMake) {
                iterFromMake.close();
            }
        });
    }

    private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(int numBuffers, boolean hasTimeout, long timeoutAt) {
        try {
            List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolder;
            if (numBuffers > this.mergeBufferPool.maxSize()) {
                throw new ResourceLimitExceededException("Query needs " + numBuffers + " merge buffers, but only " + this.mergeBufferPool.maxSize() + " merge buffers were configured. Try raising druid.processing.numMergeBuffers.");
            }
            if (hasTimeout) {
                long timeout = timeoutAt - System.currentTimeMillis();
                if (timeout <= 0L) {
                    throw new QueryTimeoutException();
                }
                mergeBufferHolder = this.mergeBufferPool.takeBatch(numBuffers, timeout);
                if (mergeBufferHolder.isEmpty()) {
                    throw new QueryTimeoutException("Cannot acquire enough merge buffers");
                }
            } else {
                mergeBufferHolder = this.mergeBufferPool.takeBatch(numBuffers);
            }
            return mergeBufferHolder;
        }
        catch (QueryTimeoutException | ResourceLimitExceededException e) {
            throw e;
        }
        catch (Exception e) {
            throw new QueryInterruptedException(e);
        }
    }

    private void waitForFutureCompletion(GroupByQuery query, List<ListenableFuture<AggregateResult>> futures, boolean hasTimeout, long timeout) {
        ListenableFuture future = Futures.allAsList(futures);
        try {
            if (this.queryWatcher != null) {
                this.queryWatcher.registerQueryFuture(query, future);
            }
            if (hasTimeout && timeout <= 0L) {
                throw new QueryTimeoutException();
            }
            List results = hasTimeout ? (List)future.get(timeout, TimeUnit.MILLISECONDS) : (List)future.get();
            for (AggregateResult result : results) {
                if (result.isOk()) continue;
                GuavaUtils.cancelAll(true, future, futures);
                throw new ResourceLimitExceededException(result.getReason());
            }
        }
        catch (InterruptedException e) {
            log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
            GuavaUtils.cancelAll(true, future, futures);
            throw new QueryInterruptedException(e);
        }
        catch (CancellationException e) {
            GuavaUtils.cancelAll(true, future, futures);
            throw new QueryInterruptedException(e);
        }
        catch (TimeoutException e) {
            log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
            GuavaUtils.cancelAll(true, future, futures);
            throw new QueryTimeoutException();
        }
        catch (ExecutionException e) {
            GuavaUtils.cancelAll(true, future, futures);
            throw new RuntimeException(e);
        }
    }
}

