/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.java.util.common.guava;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.utils.CloseableUtils;
import org.apache.druid.utils.JvmUtils;

public class ParallelMergeCombiningSequence<T>
extends YieldingSequenceBase<T> {
    private static final Logger LOG = new Logger(ParallelMergeCombiningSequence.class);
    public static final int DEFAULT_TASK_TARGET_RUN_TIME_MILLIS = 100;
    public static final int DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS = 16384;
    public static final int DEFAULT_TASK_SMALL_BATCH_NUM_ROWS = 4096;
    private final ForkJoinPool workerPool;
    private final List<Sequence<T>> inputSequences;
    private final Ordering<T> orderingFn;
    private final BinaryOperator<T> combineFn;
    private final int queueSize;
    private final boolean hasTimeout;
    private final long startTimeNanos;
    private final long timeoutAtNanos;
    private final int yieldAfter;
    private final int batchSize;
    private final int parallelism;
    private final long targetTimeNanos;
    private final Consumer<MergeCombineMetrics> metricsReporter;
    private final CancellationGizmo cancellationGizmo;

    public ParallelMergeCombiningSequence(ForkJoinPool workerPool, List<Sequence<T>> inputSequences, Ordering<T> orderingFn, BinaryOperator<T> combineFn, boolean hasTimeout, long timeoutMillis, int queryPriority, int parallelism, int yieldAfter, int batchSize, int targetTimeMillis, Consumer<MergeCombineMetrics> reporter) {
        this.workerPool = workerPool;
        this.inputSequences = inputSequences;
        this.orderingFn = orderingFn;
        this.combineFn = combineFn;
        this.hasTimeout = hasTimeout;
        this.startTimeNanos = System.nanoTime();
        this.timeoutAtNanos = this.startTimeNanos + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS);
        this.parallelism = parallelism;
        this.yieldAfter = yieldAfter;
        this.batchSize = batchSize;
        this.targetTimeNanos = TimeUnit.NANOSECONDS.convert(targetTimeMillis, TimeUnit.MILLISECONDS);
        this.queueSize = 32768 / batchSize;
        this.metricsReporter = reporter;
        this.cancellationGizmo = new CancellationGizmo();
    }

    @Override
    public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator) {
        if (this.inputSequences.isEmpty()) {
            return Sequences.empty().toYielder(initValue, accumulator);
        }
        ArrayBlockingQueue<ResultBatch<T>> outputQueue = new ArrayBlockingQueue<ResultBatch<T>>(4 * this.queueSize);
        MergeCombineMetricsAccumulator metricsAccumulator = new MergeCombineMetricsAccumulator(this.inputSequences.size());
        metricsAccumulator.setPartitions(Collections.emptyList());
        metricsAccumulator.setMergeMetrics(new MergeCombineActionMetricsAccumulator());
        MergeCombinePartitioningAction mergeCombineAction = new MergeCombinePartitioningAction(this.inputSequences, this.orderingFn, this.combineFn, outputQueue, this.queueSize, this.parallelism, this.yieldAfter, this.batchSize, this.targetTimeNanos, this.hasTimeout, this.timeoutAtNanos, metricsAccumulator, this.cancellationGizmo);
        this.workerPool.execute(mergeCombineAction);
        Sequence<T> finalOutSequence = ParallelMergeCombiningSequence.makeOutputSequenceForQueue(outputQueue, this.hasTimeout, this.timeoutAtNanos, this.cancellationGizmo).withBaggage(() -> {
            if (this.metricsReporter != null) {
                metricsAccumulator.setTotalWallTime(System.nanoTime() - this.startTimeNanos);
                this.metricsReporter.accept(metricsAccumulator.build());
            }
        });
        return finalOutSequence.toYielder(initValue, accumulator);
    }

    @VisibleForTesting
    public CancellationGizmo getCancellationGizmo() {
        return this.cancellationGizmo;
    }

    static <T> Sequence<T> makeOutputSequenceForQueue(final BlockingQueue<ResultBatch<T>> queue, final boolean hasTimeout, final long timeoutAtNanos, final CancellationGizmo cancellationGizmo) {
        return new BaseSequence(new BaseSequence.IteratorMaker<T, Iterator<T>>(){
            private boolean shouldCancelOnCleanup = true;

            @Override
            public Iterator<T> make() {
                return new Iterator<T>(){
                    private ResultBatch<T> currentBatch;

                    @Override
                    public boolean hasNext() {
                        long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
                        if (hasTimeout && thisTimeoutNanos < 0L) {
                            throw new QueryTimeoutException();
                        }
                        if (this.currentBatch != null && !this.currentBatch.isTerminalResult() && !this.currentBatch.isDrained()) {
                            return true;
                        }
                        try {
                            if (this.currentBatch == null || this.currentBatch.isDrained()) {
                                this.currentBatch = hasTimeout ? (ResultBatch)queue.poll(thisTimeoutNanos, TimeUnit.NANOSECONDS) : (ResultBatch)queue.take();
                            }
                            if (this.currentBatch == null) {
                                throw new QueryTimeoutException();
                            }
                            if (cancellationGizmo.isCancelled()) {
                                throw cancellationGizmo.getRuntimeException();
                            }
                            if (this.currentBatch.isTerminalResult()) {
                                shouldCancelOnCleanup = false;
                                return false;
                            }
                            return true;
                        }
                        catch (InterruptedException e) {
                            throw new RE(e);
                        }
                    }

                    @Override
                    public T next() {
                        if (cancellationGizmo.isCancelled()) {
                            throw cancellationGizmo.getRuntimeException();
                        }
                        if (this.currentBatch == null || this.currentBatch.isDrained() || this.currentBatch.isTerminalResult()) {
                            throw new NoSuchElementException();
                        }
                        return this.currentBatch.next();
                    }
                };
            }

            @Override
            public void cleanup(Iterator<T> iterFromMake) {
                if (this.shouldCancelOnCleanup) {
                    cancellationGizmo.cancel(new RuntimeException("Already closed"));
                }
            }
        });
    }

    private static <T> void closeAllCursors(Collection<BatchedResultsCursor<T>> cursors) {
        Closer closer = Closer.create();
        closer.registerAll(cursors);
        CloseableUtils.closeAndSuppressExceptions(closer, e -> LOG.warn((Throwable)e, "Failed to close result cursors", new Object[0]));
    }

    static class MergeCombineActionMetricsAccumulator {
        private long taskCount = 1L;
        private long inputRows = 0L;
        private long outputRows = 0L;
        private long totalCpuTimeNanos = 0L;
        private long partitionInitializedtime = 0L;

        MergeCombineActionMetricsAccumulator() {
        }

        void incrementTaskCount() {
            ++this.taskCount;
        }

        void incrementInputRows(long numInputRows) {
            this.inputRows += numInputRows;
        }

        void incrementOutputRows(long numOutputRows) {
            this.outputRows += numOutputRows;
        }

        void incrementCpuTimeNanos(long nanos) {
            this.totalCpuTimeNanos += nanos;
        }

        void setPartitionInitializedTime(long nanos) {
            this.partitionInitializedtime = nanos;
        }

        long getTaskCount() {
            return this.taskCount;
        }

        long getInputRows() {
            return this.inputRows;
        }

        long getOutputRows() {
            return this.outputRows;
        }

        long getTotalCpuTimeNanos() {
            return this.totalCpuTimeNanos;
        }

        long getPartitionInitializedtime() {
            return this.partitionInitializedtime;
        }
    }

    static class MergeCombineMetricsAccumulator {
        List<MergeCombineActionMetricsAccumulator> partitionMetrics;
        MergeCombineActionMetricsAccumulator mergeMetrics;
        private long totalWallTime;
        private final int inputSequences;

        MergeCombineMetricsAccumulator(int inputSequences) {
            this.inputSequences = inputSequences;
        }

        void setMergeMetrics(MergeCombineActionMetricsAccumulator mergeMetrics) {
            this.mergeMetrics = mergeMetrics;
        }

        void setPartitions(List<MergeCombineActionMetricsAccumulator> partitionMetrics) {
            this.partitionMetrics = partitionMetrics;
        }

        void setTotalWallTime(long time) {
            this.totalWallTime = time;
        }

        MergeCombineMetrics build() {
            long numInputRows = 0L;
            long cpuTimeNanos = 0L;
            long totalPoolTasks = 2 + this.partitionMetrics.size();
            long fastestPartInitialized = this.partitionMetrics.size() > 0 ? Long.MAX_VALUE : this.mergeMetrics.getPartitionInitializedtime();
            long slowestPartInitialied = this.partitionMetrics.size() > 0 ? Long.MIN_VALUE : this.mergeMetrics.getPartitionInitializedtime();
            for (MergeCombineActionMetricsAccumulator partition : this.partitionMetrics) {
                numInputRows += partition.getInputRows();
                cpuTimeNanos += partition.getTotalCpuTimeNanos();
                totalPoolTasks += partition.getTaskCount();
                if (partition.getPartitionInitializedtime() < fastestPartInitialized) {
                    fastestPartInitialized = partition.getPartitionInitializedtime();
                }
                if (partition.getPartitionInitializedtime() <= slowestPartInitialied) continue;
                slowestPartInitialied = partition.getPartitionInitializedtime();
            }
            if (this.partitionMetrics.isEmpty()) {
                numInputRows = this.mergeMetrics.getInputRows();
            }
            long numOutputRows = this.mergeMetrics.getOutputRows();
            return new MergeCombineMetrics(Math.max(this.partitionMetrics.size(), 1), this.inputSequences, numInputRows, numOutputRows, totalPoolTasks += this.mergeMetrics.getTaskCount(), cpuTimeNanos += this.mergeMetrics.getTotalCpuTimeNanos(), this.totalWallTime, fastestPartInitialized, slowestPartInitialied);
        }
    }

    public static class MergeCombineMetrics {
        private final int parallelism;
        private final int inputSequences;
        private final long inputRows;
        private final long outputRows;
        private final long taskCount;
        private final long totalCpuTime;
        private final long totalWallTime;
        private final long fastestPartitionInitializedTime;
        private final long slowestPartitionInitializedTime;

        MergeCombineMetrics(int parallelism, int inputSequences, long inputRows, long outputRows, long taskCount, long totalCpuTime, long totalWallTime, long fastestPartitionInitializedTime, long slowestPartitionInitializedTime) {
            this.parallelism = parallelism;
            this.inputSequences = inputSequences;
            this.inputRows = inputRows;
            this.outputRows = outputRows;
            this.taskCount = taskCount;
            this.totalCpuTime = totalCpuTime;
            this.totalWallTime = totalWallTime;
            this.fastestPartitionInitializedTime = fastestPartitionInitializedTime;
            this.slowestPartitionInitializedTime = slowestPartitionInitializedTime;
        }

        public int getParallelism() {
            return this.parallelism;
        }

        public long getInputSequences() {
            return this.inputSequences;
        }

        public long getInputRows() {
            return this.inputRows;
        }

        public long getOutputRows() {
            return this.outputRows;
        }

        public long getTaskCount() {
            return this.taskCount;
        }

        public long getTotalCpuTime() {
            return this.totalCpuTime;
        }

        public long getTotalTime() {
            return this.totalWallTime;
        }

        public long getFastestPartitionInitializedTime() {
            return this.fastestPartitionInitializedTime;
        }

        public long getSlowestPartitionInitializedTime() {
            return this.slowestPartitionInitializedTime;
        }
    }

    static class CancellationGizmo {
        private final AtomicReference<Throwable> throwable = new AtomicReference<Object>(null);

        CancellationGizmo() {
        }

        void cancel(Throwable t) {
            this.throwable.compareAndSet(null, t);
        }

        boolean isCancelled() {
            return this.throwable.get() != null;
        }

        RuntimeException getRuntimeException() {
            Throwable ex = this.throwable.get();
            if (ex instanceof RuntimeException) {
                return (RuntimeException)ex;
            }
            return new RE(ex);
        }
    }

    static class BlockingQueueuBatchedResultsCursor<E>
    extends BatchedResultsCursor<E> {
        final BlockingQueue<ResultBatch<E>> queue;
        final boolean hasTimeout;
        final long timeoutAtNanos;

        BlockingQueueuBatchedResultsCursor(BlockingQueue<ResultBatch<E>> blockingQueue, Ordering<E> ordering, boolean hasTimeout, long timeoutAtNanos) {
            super(ordering);
            this.queue = blockingQueue;
            this.hasTimeout = hasTimeout;
            this.timeoutAtNanos = timeoutAtNanos;
        }

        @Override
        public void initialize() {
            if (this.queue.isEmpty()) {
                this.nextBatch();
            } else {
                this.resultBatch = (ResultBatch)this.queue.poll();
            }
        }

        @Override
        public void advance() {
            if (!this.resultBatch.isDrained()) {
                this.resultBatch.next();
            }
            if (this.resultBatch.isDrained()) {
                this.nextBatch();
            }
        }

        @Override
        public boolean isDone() {
            return this.resultBatch.isTerminalResult();
        }

        @Override
        public boolean block() throws InterruptedException {
            if (this.resultBatch == null || this.resultBatch.isDrained()) {
                if (this.hasTimeout) {
                    long thisTimeoutNanos = this.timeoutAtNanos - System.nanoTime();
                    if (thisTimeoutNanos < 0L) {
                        this.resultBatch = ResultBatch.TERMINAL;
                        throw new QueryTimeoutException("BlockingQueue cursor timed out waiting for data");
                    }
                    this.resultBatch = this.queue.poll(thisTimeoutNanos, TimeUnit.NANOSECONDS);
                } else {
                    this.resultBatch = this.queue.take();
                }
            }
            return this.resultBatch != null;
        }

        @Override
        public boolean isReleasable() {
            if (this.resultBatch != null && (this.resultBatch.isTerminalResult() || !this.resultBatch.isDrained())) {
                return true;
            }
            this.resultBatch = (ResultBatch)this.queue.poll();
            return this.resultBatch != null;
        }
    }

    static class YielderBatchedResultsCursor<E>
    extends BatchedResultsCursor<E> {
        final SequenceBatcher<E> batcher;
        Yielder<ResultBatch<E>> yielder;

        YielderBatchedResultsCursor(SequenceBatcher<E> batcher, Ordering<E> ordering) {
            super(ordering);
            this.batcher = batcher;
        }

        @Override
        public void initialize() {
            this.yielder = this.batcher.getBatchYielder();
            this.resultBatch = this.yielder.get();
        }

        @Override
        public void advance() {
            if (!this.resultBatch.isDrained()) {
                this.resultBatch.next();
            }
            if (this.resultBatch.isDrained() && !this.yielder.isDone()) {
                this.nextBatch();
            }
        }

        @Override
        public boolean isDone() {
            return this.resultBatch == null || this.yielder.isDone() && this.resultBatch.isDrained();
        }

        @Override
        public boolean block() {
            if (this.yielder.isDone()) {
                return true;
            }
            if (this.resultBatch == null || this.resultBatch.isDrained()) {
                this.resultBatch = new ResultBatch(((SequenceBatcher)this.batcher).batchSize);
                Yielder<ResultBatch> nextYielder = this.yielder.next(this.resultBatch);
                this.yielder = nextYielder;
            }
            return true;
        }

        @Override
        public boolean isReleasable() {
            return this.yielder.isDone() || this.resultBatch != null && !this.resultBatch.isDrained();
        }

        @Override
        public void close() throws IOException {
            if (this.yielder != null) {
                this.yielder.close();
            }
        }
    }

    static abstract class BatchedResultsCursor<E>
    implements ForkJoinPool.ManagedBlocker,
    Comparable<BatchedResultsCursor<E>>,
    Closeable {
        final Ordering<E> ordering;
        volatile ResultBatch<E> resultBatch;

        BatchedResultsCursor(Ordering<E> ordering) {
            this.ordering = ordering;
        }

        public abstract void initialize();

        public abstract void advance();

        public abstract boolean isDone();

        void nextBatch() {
            try {
                ForkJoinPool.managedBlock(this);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Failed to load next batch of results", e);
            }
        }

        @Override
        public void close() throws IOException {
        }

        public E get() {
            return this.resultBatch.get();
        }

        @Override
        public int compareTo(BatchedResultsCursor<E> o) {
            return this.ordering.compare(this.get(), o.get());
        }

        public boolean equals(Object o) {
            if (!(o instanceof BatchedResultsCursor)) {
                return false;
            }
            return this.compareTo((BatchedResultsCursor)o) == 0;
        }

        public int hashCode() {
            return Objects.hash(this.ordering);
        }
    }

    static class SequenceBatcher<E>
    implements ForkJoinPool.ManagedBlocker {
        private final Sequence<E> sequence;
        private final int batchSize;
        private volatile Yielder<ResultBatch<E>> batchYielder;

        SequenceBatcher(Sequence<E> sequence, int batchSize) {
            this.sequence = sequence;
            this.batchSize = batchSize;
        }

        Yielder<ResultBatch<E>> getBatchYielder() {
            try {
                ForkJoinPool.managedBlock(this);
                return this.batchYielder;
            }
            catch (InterruptedException e) {
                this.batchYielder = Yielders.done(null, null);
                throw new RuntimeException("Failed to load initial batch of results", e);
            }
        }

        @Override
        public boolean block() {
            this.batchYielder = ResultBatch.fromSequence(this.sequence, this.batchSize);
            return true;
        }

        @Override
        public boolean isReleasable() {
            return this.batchYielder != null;
        }
    }

    static class ResultBatch<E> {
        static final ResultBatch TERMINAL = new ResultBatch();
        @Nullable
        private final Queue<E> values;

        ResultBatch(int batchSize) {
            this.values = new ArrayDeque(batchSize);
        }

        private ResultBatch() {
            this.values = null;
        }

        public void add(E in) {
            assert (this.values != null);
            this.values.offer(in);
        }

        public E get() {
            assert (this.values != null);
            return this.values.peek();
        }

        public E next() {
            assert (this.values != null);
            return this.values.poll();
        }

        boolean isDrained() {
            return this.values != null && this.values.isEmpty();
        }

        boolean isTerminalResult() {
            return this.values == null;
        }

        static <E> Yielder<ResultBatch<E>> fromSequence(Sequence<E> sequence, final int batchSize) {
            return sequence.toYielder(new ResultBatch<E>(batchSize), new YieldingAccumulator<ResultBatch<E>, E>(){
                int count = 0;

                @Override
                public ResultBatch<E> accumulate(ResultBatch<E> accumulated, E in) {
                    accumulated.add(in);
                    ++this.count;
                    if (this.count % batchSize == 0) {
                        this.yield();
                    }
                    return accumulated;
                }
            });
        }
    }

    static class QueuePusher<E>
    implements ForkJoinPool.ManagedBlocker {
        final boolean hasTimeout;
        final long timeoutAtNanos;
        final BlockingQueue<E> queue;
        volatile E item = null;

        QueuePusher(BlockingQueue<E> q, boolean hasTimeout, long timeoutAtNanos) {
            this.queue = q;
            this.hasTimeout = hasTimeout;
            this.timeoutAtNanos = timeoutAtNanos;
        }

        @Override
        public boolean block() throws InterruptedException {
            boolean success = false;
            if (this.item != null) {
                if (this.hasTimeout) {
                    long thisTimeoutNanos = this.timeoutAtNanos - System.nanoTime();
                    if (thisTimeoutNanos < 0L) {
                        this.item = null;
                        throw new QueryTimeoutException("QueuePusher timed out offering data");
                    }
                    success = this.queue.offer(this.item, thisTimeoutNanos, TimeUnit.NANOSECONDS);
                } else {
                    success = this.queue.offer(this.item);
                }
                if (success) {
                    this.item = null;
                }
            }
            return success;
        }

        @Override
        public boolean isReleasable() {
            return this.item == null;
        }

        public void offer(E item) {
            try {
                this.item = item;
                ForkJoinPool.managedBlock(this);
            }
            catch (InterruptedException e) {
                this.item = null;
                throw new RuntimeException("Failed to offer result to output queue", e);
            }
        }
    }

    private static class PrepareMergeCombineInputsAction<T>
    extends RecursiveAction {
        private final List<BatchedResultsCursor<T>> partition;
        private final Ordering<T> orderingFn;
        private final BinaryOperator<T> combineFn;
        private final QueuePusher<ResultBatch<T>> outputQueue;
        private final int yieldAfter;
        private final int batchSize;
        private final long targetTimeNanos;
        private final MergeCombineActionMetricsAccumulator metricsAccumulator;
        private final CancellationGizmo cancellationGizmo;
        private final long startTime;

        private PrepareMergeCombineInputsAction(List<BatchedResultsCursor<T>> partition, QueuePusher<ResultBatch<T>> outputQueue, Ordering<T> orderingFn, BinaryOperator<T> combineFn, int yieldAfter, int batchSize, long targetTimeNanos, MergeCombineActionMetricsAccumulator metricsAccumulator, CancellationGizmo cancellationGizmo) {
            this.partition = partition;
            this.orderingFn = orderingFn;
            this.combineFn = combineFn;
            this.outputQueue = outputQueue;
            this.yieldAfter = yieldAfter;
            this.batchSize = batchSize;
            this.targetTimeNanos = targetTimeNanos;
            this.metricsAccumulator = metricsAccumulator;
            this.cancellationGizmo = cancellationGizmo;
            this.startTime = System.nanoTime();
        }

        @Override
        protected void compute() {
            PriorityQueue<BatchedResultsCursor<T>> cursors = new PriorityQueue<BatchedResultsCursor<T>>(this.partition.size());
            try {
                for (BatchedResultsCursor<T> cursor : this.partition) {
                    cursor.initialize();
                    if (!cursor.isDone()) {
                        cursors.offer(cursor);
                        continue;
                    }
                    cursor.close();
                }
                if (cursors.size() > 0) {
                    PrepareMergeCombineInputsAction.getPool().execute(new MergeCombineAction(cursors, this.outputQueue, this.orderingFn, this.combineFn, null, this.yieldAfter, this.batchSize, this.targetTimeNanos, this.metricsAccumulator, this.cancellationGizmo));
                } else {
                    this.outputQueue.offer(ResultBatch.TERMINAL);
                }
                this.metricsAccumulator.setPartitionInitializedTime(System.nanoTime() - this.startTime);
            }
            catch (Throwable t) {
                ParallelMergeCombiningSequence.closeAllCursors(this.partition);
                this.cancellationGizmo.cancel(t);
                this.outputQueue.offer(ResultBatch.TERMINAL);
            }
        }
    }

    private static class MergeCombineAction<T>
    extends RecursiveAction {
        private final PriorityQueue<BatchedResultsCursor<T>> pQueue;
        private final Ordering<T> orderingFn;
        private final BinaryOperator<T> combineFn;
        private final QueuePusher<ResultBatch<T>> outputQueue;
        private final T initialValue;
        private final int yieldAfter;
        private final int batchSize;
        private final long targetTimeNanos;
        private final MergeCombineActionMetricsAccumulator metricsAccumulator;
        private final CancellationGizmo cancellationGizmo;

        private MergeCombineAction(PriorityQueue<BatchedResultsCursor<T>> pQueue, QueuePusher<ResultBatch<T>> outputQueue, Ordering<T> orderingFn, BinaryOperator<T> combineFn, T initialValue, int yieldAfter, int batchSize, long targetTimeNanos, MergeCombineActionMetricsAccumulator metricsAccumulator, CancellationGizmo cancellationGizmo) {
            this.pQueue = pQueue;
            this.orderingFn = orderingFn;
            this.combineFn = combineFn;
            this.outputQueue = outputQueue;
            this.initialValue = initialValue;
            this.yieldAfter = yieldAfter;
            this.batchSize = batchSize;
            this.targetTimeNanos = targetTimeNanos;
            this.metricsAccumulator = metricsAccumulator;
            this.cancellationGizmo = cancellationGizmo;
        }

        @Override
        protected void compute() {
            try {
                long start = System.nanoTime();
                long startCpuNanos = JvmUtils.safeGetThreadCpuTime();
                int counter = 0;
                int batchCounter = 0;
                ResultBatch<T> outputBatch = new ResultBatch<T>(this.batchSize);
                Object currentCombinedValue = this.initialValue;
                while (counter < this.yieldAfter && !this.pQueue.isEmpty()) {
                    BatchedResultsCursor<T> cursor = this.pQueue.poll();
                    if (!cursor.isDone()) {
                        T nextValueToAccumulate = cursor.get();
                        cursor.advance();
                        if (!cursor.isDone()) {
                            this.pQueue.offer(cursor);
                        } else {
                            cursor.close();
                        }
                        ++counter;
                        if (currentCombinedValue == null) {
                            currentCombinedValue = this.combineFn.apply(null, nextValueToAccumulate);
                            continue;
                        }
                        if (this.orderingFn.compare(currentCombinedValue, nextValueToAccumulate) == 0) {
                            currentCombinedValue = this.combineFn.apply(currentCombinedValue, nextValueToAccumulate);
                            continue;
                        }
                        outputBatch.add(currentCombinedValue);
                        if (++batchCounter >= this.batchSize) {
                            this.outputQueue.offer(outputBatch);
                            outputBatch = new ResultBatch(this.batchSize);
                            this.metricsAccumulator.incrementOutputRows(batchCounter);
                            batchCounter = 0;
                        }
                        currentCombinedValue = this.combineFn.apply(null, nextValueToAccumulate);
                        continue;
                    }
                    cursor.close();
                }
                long elapsedCpuNanos = JvmUtils.safeGetThreadCpuTime() - startCpuNanos;
                this.metricsAccumulator.incrementInputRows(counter);
                this.metricsAccumulator.incrementCpuTimeNanos(elapsedCpuNanos);
                this.metricsAccumulator.incrementTaskCount();
                if (!this.pQueue.isEmpty() && !this.cancellationGizmo.isCancelled()) {
                    if (!outputBatch.isDrained()) {
                        this.outputQueue.offer(outputBatch);
                        this.metricsAccumulator.incrementOutputRows(batchCounter);
                    }
                    long elapsedNanos = System.nanoTime() - start;
                    double nextYieldAfter = Math.max((double)this.targetTimeNanos * ((double)this.yieldAfter / (double)elapsedCpuNanos), 1.0);
                    long recursionDepth = this.metricsAccumulator.getTaskCount();
                    double cumulativeMovingAverage = (nextYieldAfter + (double)(recursionDepth * (long)this.yieldAfter)) / (double)(recursionDepth + 1L);
                    int adjustedNextYieldAfter = (int)Math.ceil(cumulativeMovingAverage);
                    LOG.debug("task recursion %s yielded %s results ran for %s millis (%s nanos), %s cpu nanos, next task yielding every %s operations", recursionDepth, this.yieldAfter, TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS), elapsedNanos, elapsedCpuNanos, adjustedNextYieldAfter);
                    MergeCombineAction.getPool().execute(new MergeCombineAction<T>(this.pQueue, this.outputQueue, this.orderingFn, this.combineFn, currentCombinedValue, adjustedNextYieldAfter, this.batchSize, this.targetTimeNanos, this.metricsAccumulator, this.cancellationGizmo));
                } else if (this.cancellationGizmo.isCancelled()) {
                    LOG.debug("cancelled after %s tasks", this.metricsAccumulator.getTaskCount());
                    ParallelMergeCombiningSequence.closeAllCursors(this.pQueue);
                    this.outputQueue.offer(ResultBatch.TERMINAL);
                } else {
                    outputBatch.add(currentCombinedValue);
                    this.metricsAccumulator.incrementOutputRows((long)batchCounter + 1L);
                    this.outputQueue.offer(outputBatch);
                    this.outputQueue.offer(ResultBatch.TERMINAL);
                    LOG.debug("merge combine complete after %s tasks", this.metricsAccumulator.getTaskCount());
                }
            }
            catch (Throwable t) {
                ParallelMergeCombiningSequence.closeAllCursors(this.pQueue);
                this.cancellationGizmo.cancel(t);
                this.outputQueue.offer(ResultBatch.TERMINAL);
            }
        }
    }

    private static class MergeCombinePartitioningAction<T>
    extends RecursiveAction {
        private final List<Sequence<T>> sequences;
        private final Ordering<T> orderingFn;
        private final BinaryOperator<T> combineFn;
        private final BlockingQueue<ResultBatch<T>> out;
        private final int queueSize;
        private final int parallelism;
        private final int yieldAfter;
        private final int batchSize;
        private final long targetTimeNanos;
        private final boolean hasTimeout;
        private final long timeoutAt;
        private final MergeCombineMetricsAccumulator metricsAccumulator;
        private final CancellationGizmo cancellationGizmo;

        private MergeCombinePartitioningAction(List<Sequence<T>> sequences, Ordering<T> orderingFn, BinaryOperator<T> combineFn, BlockingQueue<ResultBatch<T>> out, int queueSize, int parallelism, int yieldAfter, int batchSize, long targetTimeNanos, boolean hasTimeout, long timeoutAt, MergeCombineMetricsAccumulator metricsAccumulator, CancellationGizmo cancellationGizmo) {
            this.sequences = sequences;
            this.combineFn = combineFn;
            this.orderingFn = orderingFn;
            this.out = out;
            this.queueSize = queueSize;
            this.parallelism = parallelism;
            this.yieldAfter = yieldAfter;
            this.batchSize = batchSize;
            this.targetTimeNanos = targetTimeNanos;
            this.hasTimeout = hasTimeout;
            this.timeoutAt = timeoutAt;
            this.metricsAccumulator = metricsAccumulator;
            this.cancellationGizmo = cancellationGizmo;
        }

        @Override
        protected void compute() {
            ArrayList<YielderBatchedResultsCursor<T>> sequenceCursors = new ArrayList<YielderBatchedResultsCursor<T>>(this.sequences.size());
            try {
                int parallelTaskCount = this.computeNumTasks();
                if (parallelTaskCount < 2) {
                    LOG.debug("Input sequence count (%s) or available parallel merge task count (%s) too small to perform parallel merge-combine, performing serially with a single merge-combine task", this.sequences.size(), parallelTaskCount);
                    QueuePusher<ResultBatch<T>> resultsPusher = new QueuePusher<ResultBatch<T>>(this.out, this.hasTimeout, this.timeoutAt);
                    for (Sequence<T> s : this.sequences) {
                        sequenceCursors.add(new YielderBatchedResultsCursor<T>(new SequenceBatcher<T>(s, this.batchSize), this.orderingFn));
                    }
                    MergeCombineActionMetricsAccumulator soloAccumulator = new MergeCombineActionMetricsAccumulator();
                    this.metricsAccumulator.setPartitions(Collections.emptyList());
                    this.metricsAccumulator.setMergeMetrics(soloAccumulator);
                    PrepareMergeCombineInputsAction blockForInputsAction = new PrepareMergeCombineInputsAction(sequenceCursors, resultsPusher, this.orderingFn, this.combineFn, this.yieldAfter, this.batchSize, this.targetTimeNanos, soloAccumulator, this.cancellationGizmo);
                    MergeCombinePartitioningAction.getPool().execute(blockForInputsAction);
                } else {
                    LOG.debug("Spawning %s parallel merge-combine tasks for %s sequences", parallelTaskCount, this.sequences.size());
                    this.spawnParallelTasks(parallelTaskCount);
                }
            }
            catch (Throwable t) {
                ParallelMergeCombiningSequence.closeAllCursors(sequenceCursors);
                this.cancellationGizmo.cancel(t);
                this.out.offer(ResultBatch.TERMINAL);
            }
        }

        private void spawnParallelTasks(int parallelMergeTasks) {
            ArrayList tasks = new ArrayList(parallelMergeTasks);
            ArrayList<MergeCombineActionMetricsAccumulator> taskMetrics = new ArrayList<MergeCombineActionMetricsAccumulator>(parallelMergeTasks);
            ArrayList intermediaryOutputs = new ArrayList(parallelMergeTasks);
            List partitions = Lists.partition(this.sequences, (int)(this.sequences.size() / parallelMergeTasks));
            for (List list : partitions) {
                ArrayBlockingQueue outputQueue = new ArrayBlockingQueue(this.queueSize);
                intermediaryOutputs.add(outputQueue);
                QueuePusher queuePusher = new QueuePusher(outputQueue, this.hasTimeout, this.timeoutAt);
                ArrayList partitionCursors = new ArrayList(this.sequences.size());
                for (Sequence s : list) {
                    partitionCursors.add(new YielderBatchedResultsCursor(new SequenceBatcher(s, this.batchSize), this.orderingFn));
                }
                MergeCombineActionMetricsAccumulator partitionAccumulator = new MergeCombineActionMetricsAccumulator();
                PrepareMergeCombineInputsAction blockForInputsAction = new PrepareMergeCombineInputsAction(partitionCursors, queuePusher, this.orderingFn, this.combineFn, this.yieldAfter, this.batchSize, this.targetTimeNanos, partitionAccumulator, this.cancellationGizmo);
                tasks.add(blockForInputsAction);
                taskMetrics.add(partitionAccumulator);
            }
            this.metricsAccumulator.setPartitions(taskMetrics);
            for (RecursiveAction recursiveAction : tasks) {
                MergeCombinePartitioningAction.getPool().execute(recursiveAction);
            }
            QueuePusher<ResultBatch<T>> outputPusher = new QueuePusher<ResultBatch<T>>(this.out, this.hasTimeout, this.timeoutAt);
            ArrayList<BlockingQueueuBatchedResultsCursor<T>> arrayList = new ArrayList<BlockingQueueuBatchedResultsCursor<T>>(intermediaryOutputs.size());
            for (BlockingQueue blockingQueue : intermediaryOutputs) {
                arrayList.add(new BlockingQueueuBatchedResultsCursor<T>(blockingQueue, this.orderingFn, this.hasTimeout, this.timeoutAt));
            }
            MergeCombineActionMetricsAccumulator finalMergeMetrics = new MergeCombineActionMetricsAccumulator();
            this.metricsAccumulator.setMergeMetrics(finalMergeMetrics);
            PrepareMergeCombineInputsAction prepareMergeCombineInputsAction = new PrepareMergeCombineInputsAction(arrayList, outputPusher, this.orderingFn, this.combineFn, this.yieldAfter, this.batchSize, this.targetTimeNanos, finalMergeMetrics, this.cancellationGizmo);
            MergeCombinePartitioningAction.getPool().execute(prepareMergeCombineInputsAction);
        }

        private int computeNumTasks() {
            int runningThreadCount = MergeCombinePartitioningAction.getPool().getRunningThreadCount();
            int submissionCount = MergeCombinePartitioningAction.getPool().getQueuedSubmissionCount();
            int maxParallelism = Math.min(this.parallelism, MergeCombinePartitioningAction.getPool().getParallelism());
            int utilizationEstimate = runningThreadCount + submissionCount - 1;
            int computedParallelismForUtilization = maxParallelism - utilizationEstimate;
            int computedParallelismForSequences = (int)Math.floor(Math.sqrt(this.sequences.size()));
            int computedOptimalParallelism = Math.min(computedParallelismForSequences, computedParallelismForUtilization - 1);
            int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1);
            if (LOG.isDebugEnabled()) {
                ForkJoinPool pool = MergeCombinePartitioningAction.getPool();
                LOG.debug("Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] pool parallelism: [%s] pool size: [%s] steal count: [%s]", computedNumParallelTasks, this.parallelism, pool.getActiveThreadCount(), runningThreadCount, submissionCount, pool.getQueuedTaskCount(), pool.getParallelism(), pool.getPoolSize(), pool.getStealCount());
            }
            return computedNumParallelTasks;
        }
    }
}

