/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.scheduler.internal;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import java.io.InputStream;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.SystemUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsInstanceOf;
import org.hamcrest.core.IsIterableContaining;
import org.hamcrest.core.StringStartsWith;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.profiling.ProfilingService;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerBusyException;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerPoolStrategy;
import org.mule.runtime.api.scheduler.SchedulerPoolsConfig;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.util.ClassUtils;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.core.internal.profiling.DefaultProfilingService;
import org.mule.service.scheduler.internal.config.ContainerThreadPoolsConfig;
import org.mule.service.scheduler.internal.threads.SchedulerThreadPools;
import org.mule.service.scheduler.internal.util.Delegator;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.junit4.matcher.Eventually;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.mule.tck.util.CollectableReference;

@Feature(value="Scheduler Service")
public abstract class SchedulerThreadPoolsTestCase
extends AbstractMuleTestCase {
    protected static final int CORES = Runtime.getRuntime().availableProcessors();
    protected static final long GC_POLLING_TIMEOUT = 10000L;
    @Parameterized.Parameter
    public SchedulerPoolStrategy strategy;
    @Rule
    public ExpectedException expected = ExpectedException.none();
    protected ContainerThreadPoolsConfig threadPoolsConfig;
    protected SchedulerThreadPools service;
    private long prestarCallbackSleepTime = 0L;

    @Before
    public void before() throws MuleException {
        this.threadPoolsConfig = ContainerThreadPoolsConfig.loadThreadPoolsConfig();
        this.threadPoolsConfig.setSchedulerPoolStrategy(this.strategy, true);
        this.service = SchedulerThreadPools.builder((String)SchedulerThreadPoolsTestCase.class.getName(), (SchedulerPoolsConfig)this.threadPoolsConfig).setPreStartCallback(executor -> {
            try {
                Thread.sleep(this.prestarCallbackSleepTime);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new MuleRuntimeException((Throwable)e);
            }
        }).build();
        this.service.start();
    }

    @After
    public void after() throws MuleException, InterruptedException {
        if (this.service == null) {
            return;
        }
        for (Scheduler scheduler : new ArrayList(this.service.getSchedulers())) {
            scheduler.stop();
        }
        this.service.stop();
    }

    @Test
    @Description(value="Tests that the threads of the SchedulerService are correcly created and destroyed.")
    public void serviceStop() throws MuleException, InterruptedException {
        Assert.assertThat((Object)SchedulerThreadPoolsTestCase.collectThreadNames(), (Matcher)IsIterableContaining.hasItem((Matcher)StringStartsWith.startsWith((String)"[MuleRuntime].")));
        this.service.stop();
        this.service = null;
        new PollingProber(500L, 50L).check((Probe)new JUnitLambdaProbe(() -> {
            Assert.assertThat((Object)SchedulerThreadPoolsTestCase.collectThreadNames(), (Matcher)CoreMatchers.not((Matcher)IsIterableContaining.hasItem((Matcher)StringStartsWith.startsWith((String)"[MuleRuntime]."))));
            return true;
        }));
    }

    @Test
    @Description(value="Tests that dispatching a task to a throttled scheduler already running its maximum tasks throws the appropriate exception.")
    public void executorRejects() throws MuleException, ExecutionException, InterruptedException {
        Latch latch = new Latch();
        Scheduler cpuLight = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Scheduler custom = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        custom.execute(() -> this.awaitLatch(latch));
        custom.execute(() -> this.awaitLatch(latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        Runnable task = () -> {};
        cpuLight.submit(() -> {
            try {
                custom.submit(task);
            }
            finally {
                Assert.assertThat((Object)custom.shutdownNow(), (Matcher)CoreMatchers.not((Matcher)IsIterableContaining.hasItem((Object)task)));
            }
        }).get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Issue(value="MULE-19943")
    @Description(value="Tests that cron tasks dispatched to a busy executor are aborted, not blocking execution of tasks from other executors.")
    public void executorsNotBlocked() throws Exception {
        Scheduler firstScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Scheduler secondScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        AtomicInteger executionsNumber = new AtomicInteger(0);
        int minimumExpectedExecutionsNumber = 5;
        Latch latch = new Latch();
        ScheduledFuture blockingTask = null;
        Future normalTask = null;
        String everySecond = "*/1 * * ? * *";
        try {
            blockingTask = firstScheduler.scheduleWithCronExpression(() -> this.awaitLatch(latch), "*/1 * * ? * *");
            normalTask = secondScheduler.scheduleWithCronExpression(executionsNumber::incrementAndGet, "*/1 * * ? * *");
            Thread.sleep(minimumExpectedExecutionsNumber * 1000);
            latch.release();
            Assert.assertThat((Object)executionsNumber.get(), (Matcher)Matchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(minimumExpectedExecutionsNumber)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(minimumExpectedExecutionsNumber + 1))));
            blockingTask.get(10L, TimeUnit.SECONDS);
            normalTask.get(10L, TimeUnit.SECONDS);
        }
        finally {
            if (blockingTask != null) {
                blockingTask.cancel(false);
            }
            if (normalTask != null) {
                normalTask.cancel(false);
            }
        }
    }

    @Test
    @Description(value="Tests that a dispatched task has inherited the context classloader.")
    public void classLoaderPropagates() throws Exception {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        ClassLoader contextClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        Future submit = scheduler.submit(() -> Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)CoreMatchers.sameInstance((Object)contextClassLoader)));
        submit.get(60L, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that a scheduled task has inherited the context classloader.")
    public void classLoaderPropagatesScheduled() throws Exception {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        ClassLoader contextClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        Latch latch = new Latch();
        Future submit = null;
        try {
            submit = scheduler.scheduleWithFixedDelay(() -> {
                Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)CoreMatchers.sameInstance((Object)contextClassLoader));
                latch.countDown();
            }, 0L, 60L, TimeUnit.SECONDS);
            latch.await(10L, TimeUnit.SECONDS);
            submit.get(10L, TimeUnit.SECONDS);
        }
        finally {
            if (submit != null) {
                submit.cancel(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that a cron-scheduled task has inherited the context classloader.")
    public void classLoaderPropagatesCron() throws Exception {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        ClassLoader contextClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        Latch latch = new Latch();
        ScheduledFuture submit = null;
        try {
            submit = scheduler.scheduleWithCronExpression(() -> {
                Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)CoreMatchers.sameInstance((Object)contextClassLoader));
                latch.countDown();
            }, "*/1 * * ? * *");
            latch.await(10L, TimeUnit.SECONDS);
            submit.get(10L, TimeUnit.SECONDS);
        }
        finally {
            if (submit != null) {
                submit.cancel(false);
            }
        }
    }

    @Test
    @Description(value="Tests that a custom scheduler doesn't hold a reference to the context classloader that was in the context when it was created.")
    public void customPoolThreadsDontReferenceCreatorClassLoader() throws Exception {
        ClassLoader testClassLoader = new ClassLoader(((Object)((Object)this)).getClass().getClassLoader()){};
        PhantomReference<ClassLoader> clRef = new PhantomReference<ClassLoader>(testClassLoader, new ReferenceQueue());
        this.scheduleToCustomWithClassLoader(testClassLoader);
        testClassLoader = null;
        this.assertNoClassLoaderReferenceHeld(clRef, 10000L);
    }

    public void scheduleToCustomWithClassLoader(ClassLoader testClassLoader) throws InterruptedException, ExecutionException {
        AtomicReference scheduler = new AtomicReference();
        ClassUtils.withContextClassLoader((ClassLoader)testClassLoader, () -> {
            scheduler.set(this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L));
            try {
                ((Scheduler)scheduler.get()).submit(() -> Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)Is.is((Object)testClassLoader))).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
        ((Scheduler)scheduler.get()).submit(() -> Assert.assertThat((Object)Thread.currentThread().getContextClassLoader(), (Matcher)Is.is((Object)testClassLoader.getParent()))).get();
    }

    @Test
    @Description(value="Tests that a scheduler Executor thread doesn't hold a reference to an artifact classloader through the `inheritedAccessControlContext` when executing.")
    public void threadsDontReferenceClassLoaderFromAccessControlContext() throws Exception {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        ClassLoader delegatorClassLoader = this.createDelegatorClassLoader();
        PhantomReference<ClassLoader> clRef = new PhantomReference<ClassLoader>(delegatorClassLoader, new ReferenceQueue());
        Consumer delegator = (Consumer)delegatorClassLoader.loadClass(Delegator.class.getName()).newInstance();
        delegator.accept(() -> scheduler.execute(() -> {}));
        delegator = null;
        delegatorClassLoader = null;
        this.assertNoClassLoaderReferenceHeld(clRef, 10000L);
    }

    @Test
    @Description(value="Tests that a scheduler Executor thread doesn't hold a reference to an artifact classloader through the `inheritedAccessControlContext` when created.")
    public void threadsDontReferenceClassLoaderFromAccessControlContextWhenCreated() throws Exception {
        ClassLoader delegatorClassLoader = this.createDelegatorClassLoader();
        PhantomReference<ClassLoader> clRef = new PhantomReference<ClassLoader>(delegatorClassLoader, new ReferenceQueue());
        AtomicReference schedulerRef = new AtomicReference();
        Consumer delegator = (Consumer)delegatorClassLoader.loadClass(Delegator.class.getName()).newInstance();
        delegator.accept(() -> schedulerRef.set(this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L)));
        delegator = null;
        delegatorClassLoader = null;
        this.assertNoClassLoaderReferenceHeld(clRef, 10000L);
    }

    @Test
    @Description(value="Tests that when using a the commonPool from ForkJoinPool, the TCCL of the first invocation is not leaked. This test essentially validates the workaround for https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8172726")
    public void forkJoinCommonPoolDoesNotLeakFirstClassLoaderUsed() throws InstantiationException, IllegalAccessException, ClassNotFoundException, InterruptedException {
        ForkJoinPool.commonPool().shutdownNow();
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config(), 1, () -> 1000L);
        AtomicReference clRefRef = new AtomicReference();
        CountDownLatch latch = new CountDownLatch(CORES + 1);
        scheduler.execute(() -> {
            ClassLoader delegatorClassLoader = this.createDelegatorClassLoader();
            PhantomReference<ClassLoader> clRef = new PhantomReference<ClassLoader>(delegatorClassLoader, new ReferenceQueue());
            Thread.currentThread().setContextClassLoader(delegatorClassLoader);
            for (int i = 0; i < CORES; ++i) {
                ForkJoinPool.commonPool().execute(() -> latch.countDown());
            }
            clRefRef.set(clRef);
            delegatorClassLoader = null;
            latch.countDown();
        });
        latch.await(5L, TimeUnit.SECONDS);
        this.assertNoClassLoaderReferenceHeld((PhantomReference)clRefRef.get(), 10000L);
    }

    @Test
    @Issue(value="MULE-18471")
    @Description(value="Attempts to force a race condition between stopping a scheduler and periodic tasks being rescheduled.")
    public void avoidRaceConditionBetweenStopAndRescheduleFixedDelayCausingLeak() throws InstantiationException, IllegalAccessException, ClassNotFoundException, InterruptedException {
        Scheduler customScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        ClassLoader delegatorClassLoader = this.createDelegatorClassLoader();
        PhantomReference<ClassLoader> clRef = new PhantomReference<ClassLoader>(delegatorClassLoader, new ReferenceQueue());
        Consumer delegator = (Consumer)delegatorClassLoader.loadClass(Delegator.class.getName()).newInstance();
        ExecutorService scheduleExecutor = Executors.newFixedThreadPool(CORES * 2);
        for (int i = 0; i < CORES * 24; ++i) {
            this.scheduleTaskReferencingDelegator(scheduleExecutor, customScheduler, delegator);
        }
        delegator = null;
        delegatorClassLoader = null;
        Thread.sleep(100L);
        customScheduler.stop();
        this.assertNoClassLoaderReferenceHeld(clRef, 10000L);
        scheduleExecutor.shutdownNow();
    }

    private void scheduleTaskReferencingDelegator(Executor scheduleExecutor, Scheduler customScheduler, Consumer<Runnable> delegator) {
        scheduleExecutor.execute(() -> customScheduler.scheduleWithFixedDelay(() -> delegator.accept(() -> {}), 0L, 1L, TimeUnit.NANOSECONDS));
    }

    @Test
    @Issue(value="MULE-18471")
    @Description(value="Attempts to force a race condition between stopping a scheduler and a task being scheduled.")
    public void avoidRaceConditionBetweenStopAndScheduleFixedDelayCausingLeak() throws InstantiationException, IllegalAccessException, ClassNotFoundException, InterruptedException {
        Scheduler customScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        ClassLoader delegatorClassLoader = this.createDelegatorClassLoader();
        PhantomReference<ClassLoader> clRef = new PhantomReference<ClassLoader>(delegatorClassLoader, new ReferenceQueue());
        Consumer delegator = (Consumer)delegatorClassLoader.loadClass(Delegator.class.getName()).newInstance();
        ExecutorService scheduleExecutor = Executors.newSingleThreadExecutor();
        this.scheduleTaskReferencingDelegatorPending(scheduleExecutor, customScheduler, delegator);
        delegator = null;
        delegatorClassLoader = null;
        Thread.sleep(100L);
        customScheduler.stop();
        this.assertNoClassLoaderReferenceHeld(clRef, 10000L);
        scheduleExecutor.shutdownNow();
    }

    @Test
    @Issue(value="W-11580777")
    @Description(value="The Scheduler does not keep a reference to a ProfilingService which could cause a DefaultMuleContext leak.")
    public void schedulerDoesNotLeakProfilingServiceAfterShutdown() {
        DefaultProfilingService profilingService = new DefaultProfilingService();
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L, (ProfilingService)profilingService);
        CollectableReference collectableReference = new CollectableReference((Object)profilingService);
        profilingService = null;
        scheduler.shutdown();
        Assert.assertThat((Object)collectableReference, (Matcher)Is.is((Matcher)Eventually.eventually((Matcher)CollectableReference.collectedByGc())));
    }

    @Test
    @Issue(value="W-11580777")
    @Description(value="The Scheduler does not keep a reference to a ProfilingService which could cause a DefaultMuleContext leak.")
    public void schedulerDoesNotLeakProfilingServiceAfterShutdownNow() {
        DefaultProfilingService profilingService = new DefaultProfilingService();
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L, (ProfilingService)profilingService);
        CollectableReference collectableReference = new CollectableReference((Object)profilingService);
        profilingService = null;
        scheduler.shutdownNow();
        Assert.assertThat((Object)collectableReference, (Matcher)Is.is((Matcher)Eventually.eventually((Matcher)CollectableReference.collectedByGc())));
    }

    @Test
    @Issue(value="W-11356027")
    @Description(value="Cancelling a recurrent task after it has run at least once does not keep references to the task")
    public void repeatableTaskCancellationAfterRunDoesNotCauseLeak() throws Exception {
        Scheduler customScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        ClassLoader delegatorClassLoader = this.createDelegatorClassLoader();
        CollectableReference collectableReference = new CollectableReference((Object)delegatorClassLoader);
        Consumer delegator = (Consumer)delegatorClassLoader.loadClass(Delegator.class.getName()).newInstance();
        ExecutorService scheduleExecutor = Executors.newSingleThreadExecutor();
        ScheduledFuture<?> scheduledTaskReferencingDelegatorPending = this.scheduleTaskReferencingDelegatorPending(scheduleExecutor, customScheduler, delegator, 0, 1000).get();
        delegator = null;
        delegatorClassLoader = null;
        Thread.sleep(100L);
        scheduledTaskReferencingDelegatorPending.cancel(false);
        customScheduler.stop();
        scheduledTaskReferencingDelegatorPending = null;
        customScheduler = null;
        Assert.assertThat((Object)collectableReference, (Matcher)Is.is((Matcher)Eventually.eventually((Matcher)CollectableReference.collectedByGc())));
        scheduleExecutor.shutdownNow();
    }

    private Future<ScheduledFuture<?>> scheduleTaskReferencingDelegatorPending(ExecutorService scheduleExecutor, Scheduler customScheduler, Consumer<Runnable> delegator) {
        return this.scheduleTaskReferencingDelegatorPending(scheduleExecutor, customScheduler, delegator, 10000, 1);
    }

    private Future<ScheduledFuture<?>> scheduleTaskReferencingDelegatorPending(ExecutorService scheduleExecutor, Scheduler customScheduler, Consumer<Runnable> delegator, int initialDelay, int delay) {
        return scheduleExecutor.submit(() -> customScheduler.scheduleWithFixedDelay(() -> delegator.accept(() -> {}), (long)initialDelay, (long)delay, TimeUnit.SECONDS));
    }

    protected ClassLoader createDelegatorClassLoader() {
        ClassLoader testClassLoader = new ClassLoader(((Object)((Object)this)).getClass().getClassLoader()){

            @Override
            public Class<?> loadClass(String name) throws ClassNotFoundException {
                if (Delegator.class.getName().equals(name)) {
                    try {
                        byte[] classBytes = IOUtils.toByteArray((InputStream)this.getClass().getResourceAsStream("/org/mule/service/scheduler/internal/util/Delegator.class"));
                        return this.defineClass(null, classBytes, 0, classBytes.length);
                    }
                    catch (Exception e) {
                        return super.loadClass(name);
                    }
                }
                return super.loadClass(name);
            }
        };
        return testClassLoader;
    }

    protected void assertNoClassLoaderReferenceHeld(PhantomReference<ClassLoader> clRef, long timeoutMillis) {
        new PollingProber(timeoutMillis, 100L).check((Probe)new JUnitLambdaProbe(() -> {
            System.gc();
            Assert.assertThat((Object)clRef.isEnqueued(), (Matcher)Is.is((Object)true));
            return true;
        }, "A strong reference is being maintained to the ClassLoader child."));
    }

    @Test
    public void threadGroupOfCustomSchedulerNotLeakedAfterShutdownNow() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        List<PhantomReference> references = this.recordReferences(scheduler);
        scheduler.shutdownNow();
        scheduler = null;
        this.assertNoThreadGroupReferenceHeld(references);
    }

    @Test
    public void threadGroupOfCustomSchedulerNotLeakedAfterStop() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        List<PhantomReference> references = this.recordReferences(scheduler);
        scheduler.stop();
        scheduler = null;
        this.assertNoThreadGroupReferenceHeld(references);
    }

    private List<PhantomReference> recordReferences(Scheduler scheduler) throws InterruptedException, ExecutionException, TimeoutException {
        ArrayList<PhantomReference> references = new ArrayList<PhantomReference>();
        scheduler.submit(() -> {
            references.add(new PhantomReference<Thread>(Thread.currentThread(), new ReferenceQueue()));
            references.add(new PhantomReference<ThreadGroup>(Thread.currentThread().getThreadGroup(), new ReferenceQueue()));
            return true;
        }).get(5L, TimeUnit.SECONDS);
        return references;
    }

    private void assertNoThreadGroupReferenceHeld(List<PhantomReference> references) {
        new PollingProber(10000L, 100L).check((Probe)new JUnitLambdaProbe(() -> {
            System.gc();
            references.forEach(ref -> Assert.assertThat((String)ref.toString(), (Object)ref.isEnqueued(), (Matcher)Is.is((Object)true)));
            return true;
        }, "A hard reference is being mantained to the scheduler threads/thread group."));
    }

    @Test
    public void customSchedulerPrestarted() throws Exception {
        this.prestarCallbackSleepTime = 1000L;
        Scheduler cpuBoundScheduler = this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        cpuBoundScheduler.submit(() -> {
            SchedulerConfig bigPoolConfig = SchedulerConfig.config().withMaxConcurrentTasks(1);
            for (int i = 0; i < 10; ++i) {
                CountDownLatch latch = new CountDownLatch(1);
                Scheduler scheduler = this.service.createCustomScheduler(bigPoolConfig, 1, () -> 0L);
                try {
                    scheduler.submit(() -> latch.countDown());
                    try {
                        latch.await(5L, TimeUnit.SECONDS);
                        continue;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        scheduler.stop();
                        return;
                    }
                }
                finally {
                    scheduler.stop();
                }
            }
        }).get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void customSchedulerShutdownFromWithin() throws Exception {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> 1000L);
        AtomicReference customThreadGroup = new AtomicReference();
        Future stopSubmit = scheduler.submit(() -> {
            customThreadGroup.set(Thread.currentThread().getThreadGroup());
            scheduler.stop();
        });
        try {
            stopSubmit.get(10L, TimeUnit.SECONDS);
        }
        finally {
            new PollingProber().check((Probe)new JUnitLambdaProbe(() -> {
                Assert.assertThat((String)"Shutdown", (Object)scheduler.isShutdown(), (Matcher)Is.is((Object)true));
                Assert.assertThat((String)"Terminated", (Object)scheduler.isTerminated(), (Matcher)Is.is((Object)true));
                Assert.assertThat((String)"ActiveCount", (Object)((ThreadGroup)customThreadGroup.get()).activeCount(), (Matcher)Is.is((Object)0));
                if (SystemUtils.isJavaVersionAtMost((JavaVersion)JavaVersion.JAVA_17)) {
                    Assert.assertThat((String)"isDestroyed", (Object)((ThreadGroup)customThreadGroup.get()).isDestroyed(), (Matcher)Is.is((Object)true));
                }
                return true;
            }));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void customSchedulerShutdownFromWithinDelayed() throws Exception {
        Scheduler scheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(2), 2, () -> 1000L);
        AtomicReference customThreadGroup = new AtomicReference();
        AtomicBoolean cancelled = new AtomicBoolean(false);
        Future hangSubmit = scheduler.submit(() -> {
            while (!cancelled.get()) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        Future stopSubmit = scheduler.submit(() -> {
            customThreadGroup.set(Thread.currentThread().getThreadGroup());
            scheduler.stop();
        });
        this.expected.expect(CancellationException.class);
        try {
            stopSubmit.get(10L, TimeUnit.SECONDS);
        }
        finally {
            cancelled.set(true);
            new PollingProber().check((Probe)new JUnitLambdaProbe(() -> {
                Assert.assertThat((String)"Shutdown", (Object)scheduler.isShutdown(), (Matcher)Is.is((Object)true));
                Assert.assertThat((String)"Terminated", (Object)scheduler.isTerminated(), (Matcher)Is.is((Object)true));
                Assert.assertThat((String)"ActiveCount", (Object)((ThreadGroup)customThreadGroup.get()).activeCount(), (Matcher)Is.is((Object)0));
                if (SystemUtils.isJavaVersionAtMost((JavaVersion)JavaVersion.JAVA_17)) {
                    Assert.assertThat((String)"isDestroyed", (Object)((ThreadGroup)customThreadGroup.get()).isDestroyed(), (Matcher)Is.is((Object)true));
                }
                return true;
            }));
        }
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuLight() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createCpuLightScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuIntensive() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureWaitIO() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createIoScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureDirectRunCpuLightWhenTargetBusyCpuLight() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'directRunCpuLightWhenTargetBusy' behaviour");
        this.service.createCpuLightScheduler(SchedulerConfig.config().withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureDirectRunCpuLightWhenTargetBusyCpuIntensive() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'directRunCpuLightWhenTargetBusy' behaviour");
        this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> 1000L);
    }

    @Test
    public void onlyCustomMayConfigureDirectRunCpuLightWhenTargetBusyIO() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'directRunCpuLightWhenTargetBusy' behaviour");
        this.service.createIoScheduler(SchedulerConfig.config().withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> 1000L);
    }

    @Test
    @Description(value="Tests that tasks dispatched from an IO thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyIO() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Issue(value="MULE-20072")
    @Description(value="Tests that when a rejected task submitted from a custom pool is executed on the caller thread, the thread locals of the task are isolated from the caller's.")
    public void customCallerRunsHasThreadLocalsIsolation() throws ExecutionException, InterruptedException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        int maxPoolSize = this.threadPoolsConfig.getCpuLightPoolSize().orElseGet(() -> this.threadPoolsConfig.getUberMaxPoolSize().getAsInt());
        this.assertCallerRunsThreadLocalsIsolation(sourceScheduler, targetScheduler, maxPoolSize);
    }

    @Test
    @Issue(value="MULE-20072")
    @Description(value="Tests that when a rejected task submitted from a CPU Light pool is executed on the caller thread, the thread locals of the task are isolated from the caller's.")
    public void cpuLightCallerRunsHasThreadLocalsIsolation() throws ExecutionException, InterruptedException {
        Scheduler scheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        int maxPoolSize = this.threadPoolsConfig.getCpuLightPoolSize().orElseGet(() -> this.threadPoolsConfig.getUberMaxPoolSize().getAsInt());
        this.assertCallerRunsThreadLocalsIsolation(scheduler, maxPoolSize);
    }

    @Test
    @Issue(value="MULE-20072")
    @Description(value="Tests that when a rejected task submitted from an IO pool is executed on the caller thread, the thread locals of the task are isolated from the caller's.")
    public void ioCallerRunsHasThreadLocalsIsolation() throws ExecutionException, InterruptedException {
        Scheduler scheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        int maxPoolSize = this.threadPoolsConfig.getIoMaxPoolSize().orElseGet(() -> this.threadPoolsConfig.getUberMaxPoolSize().getAsInt());
        this.assertCallerRunsThreadLocalsIsolation(scheduler, maxPoolSize);
    }

    @Test
    @Description(value="Tests that periodic tasks scheduled to a busy Scheduler are skipped but the job continues executing.")
    public void rejectionPolicyScheduledPeriodic() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(2), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(60L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e.getCause(), (Matcher)IsInstanceOf.instanceOf(SchedulerBusyException.class));
        }
        catch (TimeoutException e) {
            sourceScheduler.shutdownNow();
            sourceScheduler.awaitTermination(60L, TimeUnit.SECONDS);
            throw e;
        }
        CountDownLatch scheduledTaskLatch = new CountDownLatch(2);
        AtomicReference<Object> scheduledTask = new AtomicReference<Object>(null);
        sourceScheduler.submit(() -> {
            scheduledTask.set(targetScheduler.scheduleWithFixedDelay(() -> scheduledTaskLatch.countDown(), 0L, 1L, TimeUnit.SECONDS));
            return null;
        });
        new PollingProber().check((Probe)new JUnitLambdaProbe(() -> {
            Assert.assertThat((Object)((ScheduledFuture)scheduledTask.get()).isDone(), (Matcher)Is.is((Object)true));
            return true;
        }));
        latch.countDown();
        Assert.assertThat((Object)scheduledTaskLatch.await(5L, TimeUnit.SECONDS), (Matcher)Is.is((Object)true));
    }

    @Test
    @Description(value="Tests that tasks dispatched from a Custom scheduler thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustom() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that tasks scheduled from a Custom scheduler thread are skipped of triggered when the scheduler is busy.")
    public void rejectionPolicyCustomScheduleAtFixedRate() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        try {
            submit.get(60L, TimeUnit.SECONDS);
        }
        finally {
            AtomicBoolean scheduledExecuted = new AtomicBoolean();
            targetScheduler.scheduleAtFixedRate(() -> scheduledExecuted.set(true), 0L, 5L, TimeUnit.SECONDS);
            Thread.sleep(1000L);
            Assert.assertThat((Object)scheduledExecuted.get(), (Matcher)Is.is((Object)false));
            latch.countDown();
            PollingProber.probe((long)6000L, (long)500L, () -> scheduledExecuted.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Description(value="Tests that tasks scheduled from a Custom scheduler thread are skipped of triggered when the scheduler is busy.")
    public void rejectionPolicyCustomScheduleWithFixedDelay() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        try {
            submit.get(60L, TimeUnit.SECONDS);
        }
        finally {
            AtomicBoolean scheduledExecuted = new AtomicBoolean();
            targetScheduler.scheduleWithFixedDelay(() -> scheduledExecuted.set(true), 0L, 5L, TimeUnit.SECONDS);
            Thread.sleep(1000L);
            Assert.assertThat((Object)scheduledExecuted.get(), (Matcher)Is.is((Object)false));
            latch.countDown();
            PollingProber.probe((long)6000L, (long)500L, () -> scheduledExecuted.get());
        }
    }

    @Test
    @Description(value="Tests that tasks dispatched from a Custom scheduler with 'Wait' allowed thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustomWithConfig() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler sourceScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withWaitAllowed(true).withMaxConcurrentTasks(1), CORES, () -> 1000L, 1);
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future submit = sourceScheduler.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description(value="Tests that tasks dispatched from any other thread to a busy Scheduler are rejected.")
    public void rejectionPolicyOther() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        ExecutorService sourceExecutor = Executors.newSingleThreadExecutor();
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        Latch latch = new Latch();
        Future<Object> submit = sourceExecutor.submit(this.threadsConsumer(targetScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    public void customSchedulerThreadGroupDestroy() throws Exception {
        AtomicReference innerExecutor = new AtomicReference();
        AtomicBoolean innerThreadInterupted = new AtomicBoolean();
        Scheduler targetScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> 1000L);
        targetScheduler.submit(() -> innerExecutor.set(Executors.newCachedThreadPool()));
        PollingProber.probe(() -> innerExecutor.get() != null);
        Latch latch = new Latch();
        ((ExecutorService)innerExecutor.get()).submit(() -> {
            try {
                return latch.await((long)this.getTestTimeoutSecs(), TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                innerThreadInterupted.set(true);
                Thread.currentThread().interrupt();
                return false;
            }
        });
        targetScheduler.stop();
        latch.countDown();
        PollingProber.probe((long)5000L, (long)100L, () -> innerThreadInterupted.get());
    }

    @Test
    public void customWithPriority() throws ExecutionException, InterruptedException {
        int actualPriority = this.executeWithPriorityAndCapture(1, (arg_0, arg_1, arg_2) -> ((SchedulerThreadPools)this.service).createCustomScheduler(arg_0, arg_1, arg_2));
        Assert.assertThat((Object)actualPriority, (Matcher)Is.is((Object)1));
    }

    @Test
    public void cpuLightWithPriority() throws ExecutionException, InterruptedException {
        this.expected.expect(IsInstanceOf.instanceOf(IllegalArgumentException.class));
        this.expected.expectMessage("Only custom schedulers may define 'priority' behaviour");
        this.executeWithPriorityAndCapture(1, (arg_0, arg_1, arg_2) -> ((SchedulerThreadPools)this.service).createCpuLightScheduler(arg_0, arg_1, arg_2));
    }

    @Test
    public void cpuIntensiveWithPriority() throws ExecutionException, InterruptedException {
        this.expected.expect(IsInstanceOf.instanceOf(IllegalArgumentException.class));
        this.expected.expectMessage("Only custom schedulers may define 'priority' behaviour");
        this.executeWithPriorityAndCapture(1, (arg_0, arg_1, arg_2) -> ((SchedulerThreadPools)this.service).createCpuIntensiveScheduler(arg_0, arg_1, arg_2));
    }

    @Test
    public void ioWithPriority() throws ExecutionException, InterruptedException {
        this.expected.expect(IsInstanceOf.instanceOf(IllegalArgumentException.class));
        this.expected.expectMessage("Only custom schedulers may define 'priority' behaviour");
        this.executeWithPriorityAndCapture(1, (arg_0, arg_1, arg_2) -> ((SchedulerThreadPools)this.service).createIoScheduler(arg_0, arg_1, arg_2));
    }

    private int executeWithPriorityAndCapture(int priority, SchedulerFactory schedulerFactory) throws ExecutionException, InterruptedException {
        SchedulerConfig configMaxPriority = SchedulerConfig.config().withMaxConcurrentTasks(1).withPriority(priority);
        Scheduler scheduler = schedulerFactory.create(configMaxPriority, CORES, () -> 1000L);
        CompletableFuture actualPriority = new CompletableFuture();
        scheduler.execute(() -> actualPriority.complete(Thread.currentThread().getPriority()));
        return (Integer)actualPriority.get();
    }

    protected void assertCallerRunsThreadLocalsIsolation(Scheduler scheduler, int maxPoolSize) throws ExecutionException, InterruptedException {
        this.assertCallerRunsThreadLocalsIsolation(scheduler, scheduler, maxPoolSize);
    }

    protected void assertCallerRunsThreadLocalsIsolation(Scheduler sourceScheduler, Scheduler targetScheduler, int maxPoolSize) throws InterruptedException, ExecutionException {
        Latch outerLatch = new Latch();
        Latch innerLatch = new Latch();
        for (int i = 0; i < maxPoolSize - 1; ++i) {
            this.consumeThread(targetScheduler, outerLatch);
        }
        if (sourceScheduler != targetScheduler) {
            this.consumeThread(targetScheduler, outerLatch);
        }
        AtomicReference callerThread = new AtomicReference();
        AtomicReference executingThread = new AtomicReference();
        Future submitted = sourceScheduler.submit(() -> {
            ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 1);
            threadLocal.set(2);
            callerThread.set(Thread.currentThread());
            targetScheduler.submit(() -> {
                Assert.assertThat((Object)((Integer)threadLocal.get()), (Matcher)Is.is((Object)1));
                threadLocal.set(3);
                Assert.assertThat((Object)((Integer)threadLocal.get()), (Matcher)Is.is((Object)3));
                executingThread.set(Thread.currentThread());
                innerLatch.countDown();
            });
            Assert.assertThat((Object)threadLocal.get(), (Matcher)Is.is((Object)2));
            return this.awaitLatch(outerLatch);
        });
        Assert.assertThat((Object)innerLatch.await(5L, TimeUnit.SECONDS), (Matcher)Is.is((Object)true));
        outerLatch.countDown();
        Assert.assertThat((Object)((Boolean)submitted.get()), (Matcher)Is.is((Object)true));
        Assert.assertThat((Object)((Thread)executingThread.get()), (Matcher)Is.is((Object)((Thread)callerThread.get())));
    }

    protected Callable<Object> threadsConsumer(Scheduler targetScheduler, Latch latch) {
        return () -> {
            while (latch.getCount() > 0L) {
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                this.consumeThread(targetScheduler, latch);
            }
            return null;
        };
    }

    protected void consumeThread(Scheduler scheduler, Latch latch) {
        scheduler.submit(() -> this.awaitLatch(latch));
    }

    protected boolean awaitLatch(Latch latch) {
        try {
            return latch.await((long)this.getTestTimeoutSecs(), TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    private static interface SchedulerFactory {
        public Scheduler create(SchedulerConfig var1, int var2, Supplier<Long> var3);
    }
}

