/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.executor.scheduler;

import com.google.common.base.Ticker;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.testing.TestingTicker;
import io.trino.execution.executor.scheduler.FairScheduler;
import io.trino.execution.executor.scheduler.Group;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestFairScheduler {
    @Test
    public void testBasic() throws ExecutionException, InterruptedException {
        try (FairScheduler scheduler = FairScheduler.newInstance((int)1);){
            Group group = scheduler.createGroup("G1");
            AtomicBoolean ran = new AtomicBoolean();
            ListenableFuture done = scheduler.submit(group, 1, context -> ran.set(true));
            done.get();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)ran.get()).describedAs("Ran task", new Object[0])).isTrue();
        }
    }

    @Test
    @Timeout(value=5L)
    public void testYield() throws ExecutionException, InterruptedException {
        TestingTicker ticker = new TestingTicker();
        try (FairScheduler scheduler = FairScheduler.newInstance((int)1, (Ticker)ticker);){
            Group group = scheduler.createGroup("G");
            CountDownLatch task1Started = new CountDownLatch(1);
            AtomicBoolean task2Ran = new AtomicBoolean();
            ListenableFuture task1 = scheduler.submit(group, 1, context -> {
                task1Started.countDown();
                while (!task2Ran.get()) {
                    if (context.maybeYield()) continue;
                    return;
                }
            });
            task1Started.await();
            ListenableFuture task2 = scheduler.submit(group, 2, context -> task2Ran.set(true));
            while (!task2.isDone()) {
                ticker.increment(FairScheduler.QUANTUM_NANOS * 2L, TimeUnit.NANOSECONDS);
            }
            task1.get();
        }
    }

    @Test
    public void testBlocking() throws InterruptedException, ExecutionException {
        try (FairScheduler scheduler = FairScheduler.newInstance((int)1);){
            Group group = scheduler.createGroup("G");
            CountDownLatch task1Started = new CountDownLatch(1);
            CountDownLatch task2Submitted = new CountDownLatch(1);
            CountDownLatch task2Started = new CountDownLatch(1);
            AtomicBoolean task2Ran = new AtomicBoolean();
            SettableFuture task1Blocked = SettableFuture.create();
            ListenableFuture task1 = scheduler.submit(group, 1, context -> {
                try {
                    task1Started.countDown();
                    task2Submitted.await();
                    ((AbstractBooleanAssert)Assertions.assertThat((boolean)task2Ran.get()).describedAs("Task 2 run", new Object[0])).isFalse();
                    context.block((ListenableFuture)task1Blocked);
                    ((AbstractBooleanAssert)Assertions.assertThat((boolean)task2Ran.get()).describedAs("Task 2 run", new Object[0])).isTrue();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            });
            task1Started.await();
            ListenableFuture task2 = scheduler.submit(group, 2, context -> {
                task2Started.countDown();
                task2Ran.set(true);
            });
            task2Submitted.countDown();
            task2Started.await();
            task1Blocked.set(null);
            task1.get();
            task2.get();
        }
    }

    @Test
    public void testCancelWhileYielding() throws InterruptedException, ExecutionException {
        TestingTicker ticker = new TestingTicker();
        try (FairScheduler scheduler = FairScheduler.newInstance((int)1, (Ticker)ticker);){
            Group group = scheduler.createGroup("G");
            CountDownLatch task1Started = new CountDownLatch(1);
            CountDownLatch task1TimeAdvanced = new CountDownLatch(1);
            ListenableFuture task1 = scheduler.submit(group, 1, context -> {
                try {
                    task1Started.countDown();
                    task1TimeAdvanced.await();
                    ((AbstractBooleanAssert)Assertions.assertThat((boolean)context.maybeYield()).describedAs("Cancelled while yielding", new Object[0])).isFalse();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            });
            task1Started.await();
            scheduler.pause();
            ticker.increment(FairScheduler.QUANTUM_NANOS * 2L, TimeUnit.NANOSECONDS);
            task1TimeAdvanced.countDown();
            scheduler.removeGroup(group);
            task1.get();
        }
    }

    @Test
    public void testCancelWhileBlocking() throws InterruptedException, ExecutionException {
        TestingTicker ticker = new TestingTicker();
        try (FairScheduler scheduler = FairScheduler.newInstance((int)1, (Ticker)ticker);){
            Group group = scheduler.createGroup("G");
            CountDownLatch task1Started = new CountDownLatch(1);
            TestFuture task1Blocked = new TestFuture();
            ListenableFuture task1 = scheduler.submit(group, 1, context -> {
                task1Started.countDown();
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)context.block((ListenableFuture)task1Blocked)).describedAs("Cancelled while blocking", new Object[0])).isFalse();
            });
            task1Started.await();
            task1Blocked.awaitListenerAdded();
            scheduler.removeGroup(group);
            task1.get();
        }
    }

    @Test
    public void testCleanupAfterFinish() throws InterruptedException, ExecutionException {
        TestingTicker ticker = new TestingTicker();
        try (FairScheduler scheduler = FairScheduler.newInstance((int)1, (Ticker)ticker);){
            Group group = scheduler.createGroup("G");
            AtomicInteger counter = new AtomicInteger();
            ListenableFuture task1 = scheduler.submit(group, 1, context -> counter.incrementAndGet());
            task1.get();
            Assertions.assertThat((int)counter.get()).isEqualTo(1);
            Assertions.assertThat((Collection)scheduler.getTasks(group)).isEmpty();
        }
    }

    private static class TestFuture
    extends AbstractFuture<Void> {
        private final CountDownLatch listenerAdded = new CountDownLatch(1);

        private TestFuture() {
        }

        public void addListener(Runnable listener, Executor executor) {
            super.addListener(listener, executor);
            this.listenerAdded.countDown();
        }

        public boolean set(Void value) {
            return super.set((Object)value);
        }

        public void awaitListenerAdded() throws InterruptedException {
            this.listenerAdded.await();
        }
    }
}

