/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.frame.processor;

import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableNilFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameProcessorExecutorTest;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.processor.test.ChompingFrameProcessor;
import org.apache.druid.frame.processor.test.FailingFrameProcessor;
import org.apache.druid.frame.processor.test.SleepyFrameProcessor;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RunAllFullyWidgetTest
extends FrameProcessorExecutorTest.BaseFrameProcessorExecutorTestSuite {
    private final int bouncerPoolSize;
    private final int maxOutstandingProcessors;
    private Bouncer bouncer;
    @GuardedBy(value="this")
    private int concurrentHighWatermark = 0;
    @GuardedBy(value="this")
    private int concurrentNow = 0;

    public RunAllFullyWidgetTest(int numThreads, int bouncerPoolSize, int maxOutstandingProcessors) {
        super(numThreads);
        this.bouncerPoolSize = bouncerPoolSize;
        this.maxOutstandingProcessors = maxOutstandingProcessors;
    }

    @Parameterized.Parameters(name="numThreads = {0}, bouncerPoolSize = {1}, maxOutstandingProcessors = {2}")
    public static Collection<Object[]> constructorFeeder() {
        ArrayList<Object[]> constructors = new ArrayList<Object[]>();
        for (int numThreads : new int[]{1, 3, 12}) {
            for (int bouncerPoolSize : new int[]{1, 3, 12, Integer.MAX_VALUE}) {
                for (int maxOutstandingProcessors : new int[]{1, 3, 12}) {
                    constructors.add(new Object[]{numThreads, bouncerPoolSize, maxOutstandingProcessors});
                }
            }
        }
        return constructors;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.bouncer = this.bouncerPoolSize == Integer.MAX_VALUE ? Bouncer.unlimited() : new Bouncer(this.bouncerPoolSize);
        RunAllFullyWidgetTest runAllFullyWidgetTest = this;
        synchronized (runAllFullyWidgetTest) {
            this.concurrentNow = 0;
            this.concurrentHighWatermark = 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @After
    public void tearDown() throws Exception {
        super.tearDown();
        RunAllFullyWidgetTest runAllFullyWidgetTest = this;
        synchronized (runAllFullyWidgetTest) {
            Assert.assertEquals((long)0L, (long)this.concurrentNow);
            MatcherAssert.assertThat((Object)this.concurrentHighWatermark, (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(this.bouncerPoolSize)));
            MatcherAssert.assertThat((Object)this.concurrentHighWatermark, (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(this.maxOutstandingProcessors)));
        }
        Assert.assertEquals((long)0L, (long)this.bouncer.getCurrentCount());
        Assert.assertEquals((long)this.bouncerPoolSize, (long)this.bouncer.getMaxCount());
    }

    @Test
    public void test_runAllFully_emptySequence() throws Exception {
        ListenableFuture future = this.exec.runAllFully(Sequences.empty(), (Object)"xyzzy", (s1, s2) -> s1 + s2, this.maxOutstandingProcessors, this.bouncer, null);
        Assert.assertEquals((Object)"xyzzy", (Object)future.get());
    }

    @Test
    public void test_runAllFully_fiftyThousandProcessors() throws Exception {
        int numProcessors = 50000;
        Frame frame = (Frame)Iterables.getOnlyElement((Iterable)FrameSequenceBuilder.fromAdapter((StorageAdapter)new QueryableIndexStorageAdapter(TestIndex.getMMappedTestIndex())).frameType(FrameType.ROW_BASED).frames().toList());
        Sequence processors = Sequences.simple(() -> IntStream.range(0, 50000).mapToObj(i -> {
            BlockingQueueFrameChannel channel = BlockingQueueFrameChannel.minimal();
            try {
                channel.writable().write(frame);
                channel.writable().close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return new ConcurrencyTrackingFrameProcessor<Long>(new ChompingFrameProcessor(Collections.singletonList(channel.readable())));
        }).iterator());
        ListenableFuture future = this.exec.runAllFully(processors, (Object)0L, Long::sum, this.maxOutstandingProcessors, this.bouncer, null);
        Assert.assertEquals((long)50000L, (long)((Long)future.get()));
    }

    @Test
    public void test_runAllFully_failing() {
        ListenableFuture future = this.exec.runAllFully(Sequences.simple(() -> IntStream.generate(() -> 0).mapToObj(i -> new ConcurrencyTrackingFrameProcessor<Long>(new FailingFrameProcessor((ReadableFrameChannel)ReadableNilFrameChannel.INSTANCE, BlockingQueueFrameChannel.minimal().writable(), 0))).iterator()), (Object)0L, Long::sum, this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat((Object)e.getCause().getCause(), (Matcher)CoreMatchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat((Object)e.getCause().getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"failure!")));
    }

    @Test
    public void test_runAllFully_errorAccumulateFn() {
        ListenableFuture future = this.exec.runAllFully(Sequences.simple(() -> IntStream.range(0, 100).mapToObj(i -> new ChompingFrameProcessor(Collections.emptyList())).iterator()), (Object)0L, (x, y) -> {
            throw new ISE("error!", new Object[0]);
        }, this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test
    public void test_runAllFully_errorSequenceFirstElement() {
        ListenableFuture future = this.exec.runAllFully(Sequences.simple(() -> IntStream.generate(() -> 0).mapToObj(i -> {
            throw new ISE("error!", new Object[0]);
        }).iterator()), (Object)0L, Long::sum, this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test
    public void test_runAllFully_errorSequenceSecondElement() {
        ListenableFuture future = this.exec.runAllFully(Sequences.simple(() -> IntStream.range(0, 101).mapToObj(i -> {
            if (i != 2) {
                return new ChompingFrameProcessor(Collections.emptyList());
            }
            throw new ISE("error!", new Object[0]);
        }).iterator()), (Object)0L, Long::sum, this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test
    public void test_runAllFully_errorSequenceHundredthElement() {
        ListenableFuture future = this.exec.runAllFully(Sequences.simple(() -> IntStream.range(0, 101).mapToObj(i -> {
            if (i != 100) {
                return new ChompingFrameProcessor(Collections.emptyList());
            }
            throw new ISE("error!", new Object[0]);
        }).iterator()), (Object)0L, Long::sum, this.maxOutstandingProcessors, this.bouncer, null);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> future.get());
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.equalTo((Object)"error!")));
    }

    @Test(timeout=30000L)
    public void test_runAllFully_futureCancel() throws InterruptedException {
        int expectedRunningProcessors = Math.min(Math.min(this.bouncerPoolSize, this.maxOutstandingProcessors), this.numThreads);
        List processors = IntStream.range(0, 10 * expectedRunningProcessors).mapToObj(i -> new SleepyFrameProcessor()).collect(Collectors.toList());
        ListenableFuture future = this.exec.runAllFully(Sequences.simple(processors).map(x$0 -> new ConcurrencyTrackingFrameProcessor(x$0)), (Object)0L, Long::sum, this.maxOutstandingProcessors, this.bouncer, "xyzzy");
        for (int i2 = 0; i2 < expectedRunningProcessors; ++i2) {
            ((SleepyFrameProcessor)processors.get(i2)).awaitRun();
        }
        Assert.assertTrue((boolean)future.cancel(true));
        Assert.assertTrue((boolean)future.isCancelled());
        while (this.exec.cancelableProcessorCount() > 0) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)0L, (long)this.exec.cancelableProcessorCount());
    }

    private class ConcurrencyTrackingFrameProcessor<T>
    implements FrameProcessor<T> {
        private final AtomicBoolean didRun = new AtomicBoolean(false);
        private final AtomicBoolean didCleanup = new AtomicBoolean(false);
        private final FrameProcessor<T> delegate;

        public ConcurrencyTrackingFrameProcessor(FrameProcessor<T> delegate) {
            this.delegate = delegate;
        }

        public List<ReadableFrameChannel> inputChannels() {
            return this.delegate.inputChannels();
        }

        public List<WritableFrameChannel> outputChannels() {
            return this.delegate.outputChannels();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws InterruptedException, IOException {
            if (this.didRun.compareAndSet(false, true)) {
                RunAllFullyWidgetTest runAllFullyWidgetTest = RunAllFullyWidgetTest.this;
                synchronized (runAllFullyWidgetTest) {
                    RunAllFullyWidgetTest.this.concurrentNow++;
                    if (RunAllFullyWidgetTest.this.concurrentHighWatermark < RunAllFullyWidgetTest.this.concurrentNow) {
                        RunAllFullyWidgetTest.this.concurrentHighWatermark = RunAllFullyWidgetTest.this.concurrentNow;
                    }
                }
            }
            return this.delegate.runIncrementally(readableInputs);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cleanup() throws IOException {
            try {
                this.delegate.cleanup();
            }
            finally {
                RunAllFullyWidgetTest runAllFullyWidgetTest = RunAllFullyWidgetTest.this;
                synchronized (runAllFullyWidgetTest) {
                    if (this.didRun.get() && this.didCleanup.compareAndSet(false, true)) {
                        RunAllFullyWidgetTest.this.concurrentNow--;
                    }
                }
            }
        }
    }
}

