/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.scheduler.internal;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsCollectionWithSize;
import org.hamcrest.core.Is;
import org.hamcrest.number.OrderingComparison;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mule.runtime.api.alert.TimedDataAggregation;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerPoolStrategy;
import org.mule.runtime.api.scheduler.SchedulerPoolsConfig;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.service.scheduler.internal.BaseDefaultSchedulerTestCase;
import org.mule.service.scheduler.internal.SchedulerThreadPoolsTestCase;
import org.mule.service.scheduler.internal.config.ContainerThreadPoolsConfig;
import org.mule.service.scheduler.internal.executor.SchedulerTaskThrottledException;
import org.mule.service.scheduler.internal.reporting.ReportableScheduler;
import org.mule.service.scheduler.internal.threads.SchedulerThreadPools;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;

@Feature(value="Scheduler Throttling")
public class ThrottledSchedulerThrottleTestCase
extends BaseDefaultSchedulerTestCase {
    private static final int THROTTLE_SIZE = 2;
    private static final int SINGLE_TASK_THROTTLE_SIZE = 1;
    private ExecutorService outerExecutor;
    private ContainerThreadPoolsConfig threadPoolsConfig;
    private SchedulerThreadPools service;

    @Override
    @Before
    public void before() throws Exception {
        super.before();
        this.outerExecutor = Executors.newSingleThreadExecutor();
        this.threadPoolsConfig = ContainerThreadPoolsConfig.loadThreadPoolsConfig();
        this.threadPoolsConfig.setSchedulerPoolStrategy(SchedulerPoolStrategy.DEDICATED, true);
        this.service = SchedulerThreadPools.builder((String)SchedulerThreadPoolsTestCase.class.getName(), (SchedulerPoolsConfig)this.threadPoolsConfig).build();
        this.service.start();
    }

    @Override
    @After
    public void after() throws Exception {
        if (this.service == null) {
            return;
        }
        for (Scheduler scheduler : new ArrayList(this.service.getSchedulers())) {
            scheduler.stop();
        }
        this.service.stop();
        this.outerExecutor.shutdownNow();
        this.outerExecutor.awaitTermination(5L, TimeUnit.SECONDS);
        super.after();
    }

    @Test
    @Description(value="Tests that a 'maxConcurrentTasks=1' configuration allows to execute a single task")
    public void oneConcurrentTaskSupported() throws InterruptedException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        Latch latch = new Latch();
        scheduler.submit(() -> latch.countDown());
        if (!latch.await(200L, TimeUnit.MILLISECONDS)) {
            Assert.fail((String)"Task never executed");
        }
        this.assertNoThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="Tests that the throttler count is consistent after task cancellation")
    public void interruptionUpdatesThrottleCounterCorrectly() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        Scheduler cpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), 2, () -> 5000L);
        Future outerSubmit = cpuLightScheduler.submit(() -> this.lambda$interruptionUpdatesThrottleCounterCorrectly$5((ScheduledExecutorService)scheduler));
        outerSubmit.get(60L, TimeUnit.SECONDS);
        this.assertThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="Tests that the throttler count is consistent after scheduled task pre-cancellation")
    public void cancellationOfScheduledUpdatesThrottleCounterCorrectly() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        Scheduler cpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), 2, () -> 5000L);
        Future outerSubmit = cpuLightScheduler.submit(() -> this.lambda$cancellationOfScheduledUpdatesThrottleCounterCorrectly$9((ScheduledExecutorService)scheduler));
        outerSubmit.get(60L, TimeUnit.SECONDS);
        this.assertThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="Tests that the throttler count is consistent after scheduled task cancellation")
    public void interruptionDuringExecutionOfScheduledUpdatesThrottleCounterCorrectly() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        Scheduler cpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), 2, () -> 5000L);
        Future outerSubmit = cpuLightScheduler.submit(() -> this.lambda$interruptionDuringExecutionOfScheduledUpdatesThrottleCounterCorrectly$13((ScheduledExecutorService)scheduler));
        outerSubmit.get(60L, TimeUnit.SECONDS);
        this.assertThrottles((ScheduledExecutorService)scheduler);
    }

    private void doSchedule(ScheduledExecutorService scheduler, CountDownLatch latch2) {
        scheduler.submit(() -> {
            try {
                latch2.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    @Test
    @Description(value="Tests that the throttler count is decreased after scheduled task completion")
    @Issue(value="MULE-18909")
    public void scheduledTaskMustDecrementThrottlingCounterAfterExecution() {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        CountDownLatch secondTaskIsExecuting = new CountDownLatch(2);
        scheduler.schedule(secondTaskIsExecuting::countDown, 1L, TimeUnit.MILLISECONDS);
        scheduler.schedule(secondTaskIsExecuting::countDown, 1000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat((String)"Second task should have been executed", (Object)this.awaitLatch(secondTaskIsExecuting), (Matcher)Is.is((Object)true));
        this.assertNoThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="Tests that the throttler count is decreased after scheduled task completion")
    @Issue(value="MULE-18909")
    public void scheduledTaskMustDecrementThrottlingCounterAfterExecutionNested() {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        CountDownLatch secondTaskIsExecuting = new CountDownLatch(1);
        scheduler.schedule(() -> ThrottledSchedulerThrottleTestCase.lambda$scheduledTaskMustDecrementThrottlingCounterAfterExecutionNested$17((ScheduledExecutorService)scheduler, secondTaskIsExecuting), 1000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat((String)"Second task should have been executed", (Object)this.awaitLatch(secondTaskIsExecuting), (Matcher)Is.is((Object)true));
        this.assertNoThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="Tests that a task submitted in excess of 'maxConcurrentTasks' waits until another task finishes before executing.")
    public void throttledTask() throws InterruptedException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(2), 2, () -> 5000L);
        Latch latch = new Latch();
        for (int i = 0; i < 2; ++i) {
            scheduler.execute(() -> this.awaitLatch((CountDownLatch)latch));
        }
        Future<?> throttledSubmission = this.outerExecutor.submit(() -> ThrottledSchedulerThrottleTestCase.lambda$throttledTask$21((ScheduledExecutorService)scheduler));
        Thread.sleep(10L);
        MatcherAssert.assertThat((Object)throttledSubmission.isDone(), (Matcher)Is.is((Object)false));
        latch.countDown();
        new PollingProber(100L, 10L).check((Probe)new JUnitLambdaProbe(() -> {
            MatcherAssert.assertThat((Object)throttledSubmission.isDone(), (Matcher)Is.is((Object)true));
            return true;
        }));
        this.assertThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="Tests that a task submitted in excess of 'maxConcurrentTasks' is rejected when called from a cpu-processing thread.")
    public void throttledTaskRejected() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        Latch latch = new Latch();
        for (int i = 0; i < 2; ++i) {
            scheduler.execute(() -> this.awaitLatch((CountDownLatch)latch));
        }
        Scheduler cpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), 2, () -> 5000L);
        Future submittedTest = cpuLightScheduler.submit(() -> ThrottledSchedulerThrottleTestCase.lambda$throttledTaskRejected$27((ScheduledExecutorService)scheduler));
        submittedTest.get(1L, TimeUnit.SECONDS);
        this.assertThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="A deadlock does not happen when dispatching max+2 tasks to a throttled scheduler")
    @Issue(value="MULE-17938")
    public void maxPlusTwoNoDeadlockWaitGroup() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        int totalTasks = 3;
        Scheduler waitAllowed = this.service.createIoScheduler(SchedulerConfig.config(), 3, () -> 5000L);
        Latch innerLatch = new Latch();
        ArrayList tasks = new ArrayList();
        for (int i = 0; i < 3; ++i) {
            waitAllowed.execute(() -> this.lambda$maxPlusTwoNoDeadlockWaitGroup$31(tasks, (ScheduledExecutorService)scheduler, innerLatch));
        }
        Thread.sleep(1000L);
        innerLatch.countDown();
        PollingProber.probe(() -> {
            MatcherAssert.assertThat((Object)tasks, (Matcher)IsCollectionWithSize.hasSize((int)3));
            for (Future task : tasks) {
                MatcherAssert.assertThat(task.get(1L, TimeUnit.SECONDS), (Matcher)Is.is((Object)true));
            }
            return true;
        });
        this.assertThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="A deadlock does not happen when dispatching max+2 tasks to a throttled scheduler")
    @Issue(value="MULE-17938")
    public void maxPlusTwoNoDeadlockNotWaitGroup() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        int totalTasks = 3;
        Scheduler notWaitAllowed = this.service.createCpuLightScheduler(SchedulerConfig.config(), 3, () -> 5000L);
        Latch innerLatch = new Latch();
        ArrayList tasks = new ArrayList();
        for (int i = 0; i < 3; ++i) {
            notWaitAllowed.execute(() -> this.lambda$maxPlusTwoNoDeadlockNotWaitGroup$36(tasks, (ScheduledExecutorService)scheduler, innerLatch));
        }
        Thread.sleep(1000L);
        innerLatch.countDown();
        PollingProber.probe(() -> {
            MatcherAssert.assertThat((Object)tasks, (Matcher)IsCollectionWithSize.hasSize((int)1));
            for (Future task : tasks) {
                MatcherAssert.assertThat(task.get(1L, TimeUnit.SECONDS), (Matcher)Is.is((Object)true));
            }
            return true;
        });
        notWaitAllowed.submit(() -> ThrottledSchedulerThrottleTestCase.lambda$maxPlusTwoNoDeadlockNotWaitGroup$39(tasks, (ScheduledExecutorService)scheduler)).get(1L, TimeUnit.SECONDS);
        this.assertThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="A throttled scheduler may accept many scheduled tasks and throttle them when they actually execute.")
    @Issue(value="MULE-18053")
    public void scheduleOnThrottledScheduler() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        int totalTasks = 2;
        Latch innerLatch = new Latch();
        ArrayList<ScheduledFuture<Boolean>> tasks = new ArrayList<ScheduledFuture<Boolean>>();
        for (int i = 0; i < 2; ++i) {
            tasks.add(scheduler.schedule(() -> this.awaitLatch((CountDownLatch)innerLatch), 1L, TimeUnit.SECONDS));
        }
        PollingProber.probe(() -> {
            MatcherAssert.assertThat((Object)tasks, (Matcher)IsCollectionWithSize.hasSize((int)2));
            return true;
        });
        innerLatch.countDown();
        PollingProber.probe(() -> ThrottledSchedulerThrottleTestCase.lambda$scheduleOnThrottledScheduler$43((ScheduledExecutorService)scheduler));
        this.assertNoThrottles((ScheduledExecutorService)scheduler);
    }

    @Test
    @Description(value="A throttled scheduler may accept many scheduled tasks and throttle them when they actually execute.")
    @Issue(value="MULE-18053")
    public void scheduleOnThrottledSchedulerCancelled() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 5000L);
        int totalTasks = 2;
        Latch innerLatch = new Latch();
        ArrayList<ScheduledFuture<Boolean>> tasks = new ArrayList<ScheduledFuture<Boolean>>();
        for (int i = 0; i < 2; ++i) {
            tasks.add(scheduler.schedule(() -> this.awaitLatch((CountDownLatch)innerLatch), 1L, TimeUnit.SECONDS));
        }
        PollingProber.probe(() -> {
            MatcherAssert.assertThat((Object)tasks, (Matcher)IsCollectionWithSize.hasSize((int)2));
            return true;
        });
        tasks.forEach(f -> f.cancel(true));
        innerLatch.countDown();
        PollingProber.probe(() -> ThrottledSchedulerThrottleTestCase.lambda$scheduleOnThrottledSchedulerCancelled$48((ScheduledExecutorService)scheduler));
        this.assertNoThrottles((ScheduledExecutorService)scheduler);
    }

    private void assertNoThrottles(ScheduledExecutorService executor) {
        if (executor instanceof ReportableScheduler) {
            ReportableScheduler reportable = (ReportableScheduler)executor;
            TimedDataAggregation throttlesAggregation = reportable.getThrottlesOverTime().aggregate((Object)0, (a, t) -> a + 1);
            MatcherAssert.assertThat((Object)((Integer)throttlesAggregation.forLast60MinsInterval()), (Matcher)Is.is((Object)0));
        }
    }

    private void assertThrottles(ScheduledExecutorService executor) {
        if (executor instanceof ReportableScheduler) {
            ReportableScheduler reportable = (ReportableScheduler)executor;
            TimedDataAggregation throttlesAggregation = reportable.getThrottlesOverTime().aggregate((Object)0, (a, t) -> a + 1);
            MatcherAssert.assertThat((Object)((Integer)throttlesAggregation.forLast1MinInterval()), (Matcher)Is.is((Matcher)OrderingComparison.greaterThanOrEqualTo((Comparable)Integer.valueOf(1))));
        }
    }

    private static /* synthetic */ Boolean lambda$scheduleOnThrottledSchedulerCancelled$48(ScheduledExecutorService scheduler) throws Throwable {
        MatcherAssert.assertThat((String)scheduler.toString(), (Object)scheduler.toString(), (Matcher)Matchers.endsWith((String)"(throttling: 0/1)"));
        return true;
    }

    private static /* synthetic */ Boolean lambda$scheduleOnThrottledScheduler$43(ScheduledExecutorService scheduler) throws Throwable {
        MatcherAssert.assertThat((String)scheduler.toString(), (Object)scheduler.toString(), (Matcher)Matchers.endsWith((String)"(throttling: 0/1)"));
        return true;
    }

    private static /* synthetic */ void lambda$maxPlusTwoNoDeadlockNotWaitGroup$39(List tasks, ScheduledExecutorService scheduler) {
        tasks.add(scheduler.submit(() -> true));
    }

    private /* synthetic */ void lambda$maxPlusTwoNoDeadlockNotWaitGroup$36(List tasks, ScheduledExecutorService scheduler, Latch innerLatch) {
        tasks.add(scheduler.submit(() -> this.awaitLatch((CountDownLatch)innerLatch)));
    }

    private /* synthetic */ void lambda$maxPlusTwoNoDeadlockWaitGroup$31(List tasks, ScheduledExecutorService scheduler, Latch innerLatch) {
        tasks.add(scheduler.submit(() -> this.awaitLatch((CountDownLatch)innerLatch)));
    }

    private static /* synthetic */ void lambda$throttledTaskRejected$27(ScheduledExecutorService scheduler) {
        try {
            scheduler.execute(() -> {});
            Assert.fail((String)"Expected the task to be rejected with a 'SchedulerTaskThrottledException'");
        }
        catch (SchedulerTaskThrottledException schedulerTaskThrottledException) {
            // empty catch block
        }
    }

    private static /* synthetic */ void lambda$throttledTask$21(ScheduledExecutorService scheduler) {
        scheduler.execute(() -> {});
    }

    private static /* synthetic */ ScheduledFuture lambda$scheduledTaskMustDecrementThrottlingCounterAfterExecutionNested$17(ScheduledExecutorService scheduler, CountDownLatch secondTaskIsExecuting) throws Exception {
        return scheduler.schedule(secondTaskIsExecuting::countDown, 1000L, TimeUnit.MILLISECONDS);
    }

    private /* synthetic */ void lambda$interruptionDuringExecutionOfScheduledUpdatesThrottleCounterCorrectly$13(ScheduledExecutorService scheduler) {
        ScheduledFuture<?> submit = scheduler.schedule(() -> {
            try {
                Thread.sleep(60000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, 10L, TimeUnit.MILLISECONDS);
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            Assert.fail((String)"Interrupted");
        }
        submit.cancel(true);
        CountDownLatch latch = new CountDownLatch(2);
        this.doSchedule(scheduler, latch);
        try {
            this.doSchedule(scheduler, latch);
            Assert.fail((String)("Not rejected: " + scheduler.toString()));
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    private /* synthetic */ void lambda$cancellationOfScheduledUpdatesThrottleCounterCorrectly$9(ScheduledExecutorService scheduler) {
        ScheduledFuture<?> submit = scheduler.schedule(() -> {}, 1L, TimeUnit.HOURS);
        submit.cancel(true);
        CountDownLatch latch = new CountDownLatch(2);
        this.doSchedule(scheduler, latch);
        try {
            this.doSchedule(scheduler, latch);
            Assert.fail((String)("Not rejected: " + scheduler.toString()));
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    private /* synthetic */ void lambda$interruptionUpdatesThrottleCounterCorrectly$5(ScheduledExecutorService scheduler) {
        Future<?> submit = scheduler.submit(() -> {
            try {
                Thread.sleep(60000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        submit.cancel(true);
        CountDownLatch latch = new CountDownLatch(2);
        this.doSchedule(scheduler, latch);
        try {
            this.doSchedule(scheduler, latch);
            Assert.fail((String)("Not rejected: " + scheduler.toString()));
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }
}

