/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.frame.processor;

import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableNilFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameProcessorExecutorTest;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.processor.manager.ProcessorAndCallback;
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.frame.processor.manager.ProcessorManagers;
import org.apache.druid.frame.processor.test.ChompingFrameProcessor;
import org.apache.druid.frame.processor.test.FailingFrameProcessor;
import org.apache.druid.frame.processor.test.SleepyFrameProcessor;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RunAllFullyWidgetTest
extends FrameProcessorExecutorTest.BaseFrameProcessorExecutorTestSuite {
    private final int bouncerPoolSize;
    private final int maxOutstandingProcessors;
    private final boolean delayed;
    private final AtomicLong closed = new AtomicLong();
    private Bouncer bouncer;
    @GuardedBy(value="this")
    private int concurrentHighWatermark = 0;
    @GuardedBy(value="this")
    private int concurrentNow = 0;

    public RunAllFullyWidgetTest(int numThreads, int bouncerPoolSize, int maxOutstandingProcessors, boolean delayed) {
        super(numThreads);
        this.bouncerPoolSize = bouncerPoolSize;
        this.maxOutstandingProcessors = maxOutstandingProcessors;
        this.delayed = delayed;
    }

    @Parameterized.Parameters(name="numThreads = {0}, bouncerPoolSize = {1}, maxOutstandingProcessors = {2}, delayed = {3}")
    public static Collection<Object[]> constructorFeeder() {
        ArrayList<Object[]> constructors = new ArrayList<Object[]>();
        for (int numThreads : new int[]{1, 3, 12}) {
            for (int bouncerPoolSize : new int[]{1, 3, 12, Integer.MAX_VALUE}) {
                for (int maxOutstandingProcessors : new int[]{1, 3, 12}) {
                    for (boolean delayed : new boolean[]{false, true}) {
                        constructors.add(new Object[]{numThreads, bouncerPoolSize, maxOutstandingProcessors, delayed});
                    }
                }
            }
        }
        return constructors;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.bouncer = this.bouncerPoolSize == Integer.MAX_VALUE ? Bouncer.unlimited() : new Bouncer(this.bouncerPoolSize);
        RunAllFullyWidgetTest runAllFullyWidgetTest = this;
        synchronized (runAllFullyWidgetTest) {
            this.concurrentNow = 0;
            this.concurrentHighWatermark = 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @After
    public void tearDown() throws Exception {
        super.tearDown();
        RunAllFullyWidgetTest runAllFullyWidgetTest = this;
        synchronized (runAllFullyWidgetTest) {
            Assert.assertEquals((long)0L, (long)this.concurrentNow);
            MatcherAssert.assertThat((Object)this.concurrentHighWatermark, (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(this.bouncerPoolSize)));
            MatcherAssert.assertThat((Object)this.concurrentHighWatermark, (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(this.maxOutstandingProcessors)));
        }
        Assert.assertEquals((String)"Bouncer current running count", (long)0L, (long)this.bouncer.getCurrentCount());
        Assert.assertEquals((String)"Bouncer max pool size", (long)this.bouncerPoolSize, (long)this.bouncer.getMaxCount());
        Assert.assertEquals((String)"Encountered single close (from ensureClose)", (long)1L, (long)this.closed.get());
    }

    @Test
    public void test_runAllFully_emptyChannel() throws Exception {
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(this.ensureClose(ProcessorManagers.none().withAccumulation((Object)"xyzzy", (s1, s2) -> s1 + s2))), this.maxOutstandingProcessors, this.bouncer, null);
        Assert.assertEquals((Object)"xyzzy", (Object)future.get());
    }

    @Test
    public void test_runAllFully_fiftyThousandProcessors() throws Exception {
        int numProcessors = 100;
        Frame frame = (Frame)Iterables.getOnlyElement((Iterable)FrameSequenceBuilder.fromAdapter((StorageAdapter)new QueryableIndexStorageAdapter(TestIndex.getMMappedTestIndex())).frameType(FrameType.ROW_BASED).frames().toList());
        ProcessorManager processors = ProcessorManagers.of((Iterable)Iterables.transform(IntStream.range(0, 100)::iterator, i -> {
            BlockingQueueFrameChannel channel = BlockingQueueFrameChannel.minimal();
            try {
                channel.writable().write(frame);
                channel.writable().close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return new ConcurrencyTrackingFrameProcessor<Long>(new ChompingFrameProcessor(Collections.singletonList(channel.readable())));
        })).withAccumulation((Object)0L, Long::sum);
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(this.ensureClose(processors)), this.maxOutstandingProcessors, this.bouncer, null);
        Assert.assertEquals((long)100L, (long)((Long)future.get()));
    }

    @Test
    public void test_runAllFully_failing() {
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(this.ensureClose(ProcessorManagers.of((Iterable)Iterables.transform(IntStream.generate(() -> 0)::iterator, i -> new ConcurrencyTrackingFrameProcessor<Long>(new FailingFrameProcessor((ReadableFrameChannel)ReadableNilFrameChannel.INSTANCE, BlockingQueueFrameChannel.minimal().writable(), 0)))).withAccumulation((Object)0L, Long::sum))), this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat((Object)e.getCause().getCause(), (Matcher)CoreMatchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat((Object)e.getCause().getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"failure!")));
    }

    @Test
    public void test_runAllFully_errorAccumulateFn() {
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(this.ensureClose(ProcessorManagers.of((Iterable)Iterables.transform(IntStream.range(0, 100)::iterator, i -> new ChompingFrameProcessor(Collections.emptyList()))).withAccumulation((Object)0L, (x, y) -> {
            throw new ISE("error!", new Object[0]);
        }))), this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test
    public void test_runAllFully_errorChannelFirstElement() {
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(new ThrowOnNextProcessorManager(this.ensureClose(ProcessorManagers.of((Iterable)Iterables.transform(IntStream.generate(() -> 0)::iterator, i -> new ChompingFrameProcessor(Collections.emptyList()))).withAccumulation((Object)0L, Long::sum)), 0)), this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test
    public void test_runAllFully_errorChannelSecondElement() {
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(new ThrowOnNextProcessorManager(this.ensureClose(ProcessorManagers.of((Iterable)Iterables.transform(IntStream.generate(() -> 0)::iterator, i -> new ChompingFrameProcessor(Collections.emptyList()))).withAccumulation((Object)0L, Long::sum)), 1)), this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test
    public void test_runAllFully_errorChannelHundredthElement() {
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(new ThrowOnNextProcessorManager(this.ensureClose(ProcessorManagers.of((Iterable)Iterables.transform(IntStream.generate(() -> 0)::iterator, i -> new ChompingFrameProcessor(Collections.emptyList()))).withAccumulation((Object)0L, Long::sum)), 100)), this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test
    public void test_runAllFully_errorChannelClose() {
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(new ThrowOnCloseProcessorManager(this.ensureClose(ProcessorManagers.of((Iterable)Iterables.transform(IntStream.range(0, 101)::iterator, i -> new ChompingFrameProcessor(Collections.emptyList()))).withAccumulation((Object)0L, Long::sum)))), this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test
    public void test_runAllFully_errorChannelSecondElementAndClose() {
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(new ThrowOnCloseProcessorManager(new ThrowOnNextProcessorManager(this.ensureClose(ProcessorManagers.of((Iterable)Iterables.transform(IntStream.range(0, 101)::iterator, i -> new ChompingFrameProcessor(Collections.emptyList()))).withAccumulation((Object)0L, Long::sum)), 1))), this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test(timeout=30000L)
    public void test_runAllFully_futureCancel() throws InterruptedException {
        int expectedRunningProcessors = Math.min(Math.min(this.bouncerPoolSize, this.maxOutstandingProcessors), this.numThreads);
        List processors = IntStream.range(0, 10 * expectedRunningProcessors).mapToObj(i -> new SleepyFrameProcessor()).collect(Collectors.toList());
        ListenableFuture future = this.exec.runAllFully(this.possiblyDelay(this.ensureClose(ProcessorManagers.of((Sequence)Sequences.simple(processors).map(x$0 -> new ConcurrencyTrackingFrameProcessor(x$0))).withAccumulation((Object)0L, Long::sum))), this.maxOutstandingProcessors, this.bouncer, "xyzzy");
        for (int i2 = 0; i2 < expectedRunningProcessors; ++i2) {
            ((SleepyFrameProcessor)processors.get(i2)).awaitRun();
        }
        Assert.assertTrue((boolean)future.cancel(true));
        Assert.assertTrue((boolean)future.isCancelled());
        while (this.exec.cancelableProcessorCount() > 0) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)0L, (long)this.exec.cancelableProcessorCount());
    }

    private <T, R> ProcessorManager<T, R> possiblyDelay(ProcessorManager<T, R> processorManager) {
        return this.delayed ? new DelayedProcessorManager(processorManager) : processorManager;
    }

    private <T, R> ProcessorManager<T, R> ensureClose(ProcessorManager<T, R> processorManager) {
        return new EnsureCloseProcessorManager<T, R>(processorManager);
    }

    private class DelayedProcessorManager<T, R>
    implements ProcessorManager<T, R> {
        private final ProcessorManager<T, R> delegate;

        public DelayedProcessorManager(ProcessorManager<T, R> delegate) {
            this.delegate = delegate;
        }

        public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() {
            ListenableFuture future = this.delegate.next();
            final SettableFuture retVal = SettableFuture.create();
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Optional<ProcessorAndCallback<T>>>(){

                public void onSuccess(Optional<ProcessorAndCallback<T>> result) {
                    retVal.set(result);
                }

                public void onFailure(Throwable t) {
                    retVal.setException(t);
                }
            }, (Executor)RunAllFullyWidgetTest.this.exec.getExecutorService());
            return retVal;
        }

        public R result() {
            return (R)this.delegate.result();
        }

        public void close() {
            this.delegate.close();
        }
    }

    private static class ThrowOnCloseProcessorManager<T, R>
    implements ProcessorManager<T, R> {
        private final ProcessorManager<T, R> delegate;

        public ThrowOnCloseProcessorManager(ProcessorManager<T, R> delegate) {
            this.delegate = delegate;
        }

        public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() {
            return this.delegate.next();
        }

        public R result() {
            return (R)this.delegate.result();
        }

        public void close() {
            this.delegate.close();
            throw new ISE("error!", new Object[0]);
        }
    }

    private static class ThrowOnNextProcessorManager<T, R>
    implements ProcessorManager<T, R> {
        private final ProcessorManager<T, R> delegate;
        private int i;

        public ThrowOnNextProcessorManager(ProcessorManager<T, R> delegate, int i) {
            this.delegate = delegate;
            this.i = i;
        }

        public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() {
            if (this.i == 0) {
                throw new ISE("error!", new Object[0]);
            }
            --this.i;
            return this.delegate.next();
        }

        public R result() {
            return (R)this.delegate.result();
        }

        public void close() {
            this.delegate.close();
        }
    }

    private class EnsureCloseProcessorManager<T, R>
    implements ProcessorManager<T, R> {
        private final ProcessorManager<T, R> delegate;

        public EnsureCloseProcessorManager(ProcessorManager<T, R> delegate) {
            this.delegate = delegate;
        }

        public ListenableFuture<Optional<ProcessorAndCallback<T>>> next() {
            return this.delegate.next();
        }

        public R result() {
            return (R)this.delegate.result();
        }

        public void close() {
            RunAllFullyWidgetTest.this.closed.getAndIncrement();
            this.delegate.close();
        }
    }

    private class ConcurrencyTrackingFrameProcessor<T>
    implements FrameProcessor<T> {
        private final AtomicBoolean didRun = new AtomicBoolean(false);
        private final AtomicBoolean didCleanup = new AtomicBoolean(false);
        private final FrameProcessor<T> delegate;

        public ConcurrencyTrackingFrameProcessor(FrameProcessor<T> delegate) {
            this.delegate = delegate;
        }

        public List<ReadableFrameChannel> inputChannels() {
            return this.delegate.inputChannels();
        }

        public List<WritableFrameChannel> outputChannels() {
            return this.delegate.outputChannels();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws InterruptedException, IOException {
            if (this.didRun.compareAndSet(false, true)) {
                RunAllFullyWidgetTest runAllFullyWidgetTest = RunAllFullyWidgetTest.this;
                synchronized (runAllFullyWidgetTest) {
                    RunAllFullyWidgetTest.this.concurrentNow++;
                    if (RunAllFullyWidgetTest.this.concurrentHighWatermark < RunAllFullyWidgetTest.this.concurrentNow) {
                        RunAllFullyWidgetTest.this.concurrentHighWatermark = RunAllFullyWidgetTest.this.concurrentNow;
                    }
                }
            }
            return this.delegate.runIncrementally(readableInputs);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cleanup() throws IOException {
            try {
                this.delegate.cleanup();
            }
            finally {
                RunAllFullyWidgetTest runAllFullyWidgetTest = RunAllFullyWidgetTest.this;
                synchronized (runAllFullyWidgetTest) {
                    if (this.didRun.get() && this.didCleanup.compareAndSet(false, true)) {
                        RunAllFullyWidgetTest.this.concurrentNow--;
                    }
                }
            }
        }
    }
}

