/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.query.scan;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.druid.collections.StableLimitingSorter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;

public class ScanQueryRunnerFactory
implements QueryRunnerFactory<ScanResultValue, ScanQuery> {
    private final ScanQueryQueryToolChest toolChest;
    private final ScanQueryEngine engine;
    private final ScanQueryConfig scanQueryConfig;

    @Inject
    public ScanQueryRunnerFactory(ScanQueryQueryToolChest toolChest, ScanQueryEngine engine, ScanQueryConfig scanQueryConfig) {
        this.toolChest = toolChest;
        this.engine = engine;
        this.scanQueryConfig = scanQueryConfig;
    }

    @Override
    public QueryRunner<ScanResultValue> createRunner(Segment segment) {
        return new ScanQueryRunner(this.engine, segment);
    }

    @Override
    public QueryRunner<ScanResultValue> mergeRunners(QueryProcessingPool queryProcessingPool, Iterable<QueryRunner<ScanResultValue>> queryRunners) {
        return (queryPlus, responseContext) -> {
            int maxSegmentPartitionsOrderedInMemory;
            int maxRowsQueuedForOrdering;
            ScanQuery query = (ScanQuery)queryPlus.getQuery();
            ScanQuery.verifyOrderByForNativeExecution(query);
            long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
            responseContext.putTimeoutTime(timeoutAt);
            if (query.getTimeOrder().equals((Object)ScanQuery.Order.NONE)) {
                Sequence returnedRows = Sequences.concat((Sequence)Sequences.map((Sequence)Sequences.simple((Iterable)queryRunners), input -> input.run(queryPlus, responseContext)));
                if (query.getScanRowsLimit() <= Integer.MAX_VALUE) {
                    return returnedRows.limit((long)Math.toIntExact(query.getScanRowsLimit()));
                }
                return returnedRows;
            }
            List intervalsOrdered = this.getIntervalsFromSpecificQuerySpec(query.getQuerySegmentSpec());
            List queryRunnersOrdered = Lists.newArrayList((Iterable)queryRunners);
            if (ScanQuery.Order.DESCENDING.equals((Object)query.getTimeOrder())) {
                intervalsOrdered = Lists.reverse(intervalsOrdered);
                queryRunnersOrdered = Lists.reverse((List)queryRunnersOrdered);
            }
            int n = maxRowsQueuedForOrdering = query.getMaxRowsQueuedForOrdering() == null ? this.scanQueryConfig.getMaxRowsQueuedForOrdering() : query.getMaxRowsQueuedForOrdering().intValue();
            if (query.getScanRowsLimit() <= (long)maxRowsQueuedForOrdering) {
                try {
                    return this.stableLimitingSort((Sequence<ScanResultValue>)Sequences.concat((Sequence)Sequences.map((Sequence)Sequences.simple((Iterable)queryRunnersOrdered), input -> input.run(queryPlus, responseContext))), query, intervalsOrdered);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            ArrayList<Pair> intervalsAndRunnersOrdered = new ArrayList<Pair>();
            if (intervalsOrdered.size() == queryRunnersOrdered.size()) {
                for (int i = 0; i < queryRunnersOrdered.size(); ++i) {
                    intervalsAndRunnersOrdered.add(new Pair(intervalsOrdered.get(i), queryRunnersOrdered.get(i)));
                }
            } else if (queryRunners instanceof SinkQueryRunners) {
                ((SinkQueryRunners)queryRunners).runnerIntervalMappingIterator().forEachRemaining(intervalsAndRunnersOrdered::add);
            } else {
                throw new ISE("Number of segment descriptors does not equal number of query runners...something went wrong!", new Object[0]);
            }
            LinkedHashMap partitionsGroupedByInterval = intervalsAndRunnersOrdered.stream().collect(Collectors.groupingBy(x -> (Interval)x.lhs, LinkedHashMap::new, Collectors.toList()));
            int maxNumPartitionsInSegment = partitionsGroupedByInterval.values().stream().map(x -> x.size()).max(Comparator.comparing(Integer::valueOf)).get();
            int n2 = maxSegmentPartitionsOrderedInMemory = query.getMaxSegmentPartitionsOrderedInMemory() == null ? this.scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory() : query.getMaxSegmentPartitionsOrderedInMemory().intValue();
            if (maxNumPartitionsInSegment <= maxSegmentPartitionsOrderedInMemory) {
                List<List<QueryRunner<ScanResultValue>>> groupedRunners = partitionsGroupedByInterval.entrySet().stream().map(entry -> ((List)entry.getValue()).stream().map(segQueryRunnerPair -> (QueryRunner)segQueryRunnerPair.rhs).collect(Collectors.toList())).collect(Collectors.toList());
                return this.nWayMergeAndLimit(groupedRunners, queryPlus, responseContext);
            }
            throw ResourceLimitExceededException.withMessage("Time ordering is not supported for a Scan query with %,d segments per time chunk and a row limit of %,d. Try reducing your query limit below maxRowsQueuedForOrdering (currently %,d), or using compaction to reduce the number of segments per time chunk, or raising maxSegmentPartitionsOrderedInMemory (currently %,d) above the number of segments you have per time chunk.", maxNumPartitionsInSegment, query.getScanRowsLimit(), maxRowsQueuedForOrdering, maxSegmentPartitionsOrderedInMemory);
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    Sequence<ScanResultValue> stableLimitingSort(Sequence<ScanResultValue> inputSequence, ScanQuery scanQuery, List<Interval> intervalsOrdered) throws IOException {
        Ordering<ScanResultValue> comparator = scanQuery.getResultOrdering();
        if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
            throw new UOE("Limit of %,d rows not supported for priority queue strategy of time-ordering scan results", new Object[]{scanQuery.getScanRowsLimit()});
        }
        int limit = Math.toIntExact(scanQuery.getScanRowsLimit());
        StableLimitingSorter sorter = new StableLimitingSorter(comparator, limit);
        try (Yielder yielder = Yielders.each(inputSequence);){
            boolean doneScanning = yielder.isDone();
            int numRowsScanned = 0;
            Interval finalInterval = null;
            while (!doneScanning) {
                ScanResultValue next = (ScanResultValue)yielder.get();
                List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
                for (ScanResultValue srv : singleEventScanResultValues) {
                    sorter.add((Object)srv);
                    if (++numRowsScanned <= limit || finalInterval != null) continue;
                    long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat());
                    for (Interval interval : intervalsOrdered) {
                        if (!interval.contains(timestampOfLimitRow)) continue;
                        finalInterval = interval;
                    }
                    if (finalInterval != null) continue;
                    throw new ISE("Row came from an unscanned interval", new Object[0]);
                }
                doneScanning = (yielder = yielder.next(null)).isDone() || finalInterval != null && !finalInterval.contains(next.getFirstEventTimestamp(scanQuery.getResultFormat()));
            }
            ArrayList sortedElements = new ArrayList(sorter.size());
            Iterators.addAll(sortedElements, (Iterator)sorter.drain());
            Sequence sequence = Sequences.simple(sortedElements);
            return sequence;
        }
    }

    @VisibleForTesting
    List<Interval> getIntervalsFromSpecificQuerySpec(QuerySegmentSpec spec) {
        List<Object> descriptorsOrdered;
        if (spec instanceof MultipleSpecificSegmentSpec) {
            descriptorsOrdered = ((MultipleSpecificSegmentSpec)spec).getDescriptors().stream().map(SegmentDescriptor::getInterval).collect(Collectors.toList());
        } else if (spec instanceof SpecificSegmentSpec) {
            descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec)spec).getDescriptor().getInterval());
        } else {
            throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.", new Object[]{spec.getClass().getSimpleName()});
        }
        return descriptorsOrdered;
    }

    @VisibleForTesting
    Sequence<ScanResultValue> nWayMergeAndLimit(List<List<QueryRunner<ScanResultValue>>> groupedRunners, QueryPlus<ScanResultValue> queryPlus, ResponseContext responseContext) {
        Sequence resultSequence = Sequences.concat((Sequence)Sequences.map((Sequence)Sequences.simple(groupedRunners), runnerGroup -> Sequences.map((Sequence)Sequences.simple((Iterable)runnerGroup), input -> Sequences.concat((Sequence)Sequences.map(input.run(queryPlus, responseContext), srv -> Sequences.simple(srv.toSingleEventScanResultValues())))).flatMerge(seq -> seq, queryPlus.getQuery().getResultOrdering())));
        long limit = ((ScanQuery)queryPlus.getQuery()).getScanRowsLimit();
        if (limit == Long.MAX_VALUE) {
            return resultSequence;
        }
        return resultSequence.limit(limit);
    }

    @Override
    public QueryToolChest<ScanResultValue, ScanQuery> getToolchest() {
        return this.toolChest;
    }

    private static class ScanQueryRunner
    implements QueryRunner<ScanResultValue> {
        private final ScanQueryEngine engine;
        private final Segment segment;

        public ScanQueryRunner(ScanQueryEngine engine, Segment segment) {
            this.engine = engine;
            this.segment = segment;
        }

        @Override
        public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, ResponseContext responseContext) {
            Query<ScanResultValue> query = queryPlus.getQuery();
            if (!(query instanceof ScanQuery)) {
                throw new ISE("Got a [%s] which isn't a %s", new Object[]{query.getClass(), ScanQuery.class});
            }
            ScanQuery.verifyOrderByForNativeExecution((ScanQuery)query);
            Long timeoutAt = responseContext.getTimeoutTime();
            if (timeoutAt == null || timeoutAt == 0L) {
                responseContext.putTimeoutTime(0x3FFFFFFFFFFFFFFFL);
            }
            return this.engine.process((ScanQuery)query, this.segment, responseContext);
        }
    }
}

