/*
 * Decompiled with CFR 0.152.
 */
package kafka.utils;

import java.io.File;
import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.log.LocalLog;
import kafka.log.LogLoader;
import kafka.log.LogLoader$;
import kafka.log.UnifiedLog;
import kafka.log.UnifiedLog$;
import kafka.server.BrokerTopicStats;
import kafka.server.BrokerTopicStats$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LoadedLogOffsets;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogSegments;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.compat.java8.OptionConverters;
import scala.compat.java8.OptionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0001q4A!\u0006\f\u00017!)!\u0005\u0001C\u0001G!9a\u0005\u0001b\u0001\n\u00039\u0003BB\u001b\u0001A\u0003%\u0001\u0006C\u00047\u0001\t\u0007I\u0011A\u001c\t\rm\u0002\u0001\u0015!\u00039\u0011\u001da\u0004A1A\u0005\u0002uBa!\u0013\u0001!\u0002\u0013q\u0004b\u0002&\u0001\u0005\u0004%\t!\u0010\u0005\u0007\u0017\u0002\u0001\u000b\u0011\u0002 \t\u000b1\u0003A\u0011A'\t\u000bq\u0003A\u0011A'\t\u000b\u0005\u0004A\u0011A'\t\u000b\u0019\u0004A\u0011A'\t\u000b!\u0004A\u0011A'\t\u000b)\u0004A\u0011A'\t\u000b1\u0004A\u0011A'\t\u000b9\u0004A\u0011A'\t\u000bA\u0004A\u0011A'\t\u000bI\u0004A\u0011A'\t\u000bQ\u0004A\u0011A'\u0003\u001bM\u001b\u0007.\u001a3vY\u0016\u0014H+Z:u\u0015\t9\u0002$A\u0003vi&d7OC\u0001\u001a\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u000f\u0011\u0005u\u0001S\"\u0001\u0010\u000b\u0003}\tQa]2bY\u0006L!!\t\u0010\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\tA\u0005\u0005\u0002&\u00015\ta#A\u0005tG\",G-\u001e7feV\t\u0001\u0006\u0005\u0002*g5\t!F\u0003\u0002,Y\u0005!Q\u000f^5m\u0015\tic&\u0001\u0004tKJ4XM\u001d\u0006\u00033=R!\u0001M\u0019\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0011\u0014aA8sO&\u0011AG\u000b\u0002\u000f\u0017\u000647.Y*dQ\u0016$W\u000f\\3s\u0003)\u00198\r[3ek2,'\u000fI\u0001\t[>\u001c7\u000eV5nKV\t\u0001\b\u0005\u0002*s%\u0011!H\u000b\u0002\t\u001b>\u001c7\u000eV5nK\u0006IQn\\2l)&lW\rI\u0001\tG>,h\u000e^3scU\ta\b\u0005\u0002@\u000f6\t\u0001I\u0003\u0002B\u0005\u00061\u0011\r^8nS\u000eT!a\u0011#\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002,\u000b*\ta)\u0001\u0003kCZ\f\u0017B\u0001%A\u00055\tEo\\7jG&sG/Z4fe\u0006I1m\\;oi\u0016\u0014\u0018\u0007I\u0001\tG>,h\u000e^3se\u0005I1m\\;oi\u0016\u0014(\u0007I\u0001\u0006g\u0016$X\u000f\u001d\u000b\u0002\u001dB\u0011QdT\u0005\u0003!z\u0011A!\u00168ji\"\u0012!B\u0015\t\u0003'jk\u0011\u0001\u0016\u0006\u0003+Z\u000b1!\u00199j\u0015\t9\u0006,A\u0004kkBLG/\u001a:\u000b\u0005e\u000b\u0014!\u00026v]&$\u0018BA.U\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\ti\u0016\f'\u000fZ8x]\"\u00121B\u0018\t\u0003'~K!\u0001\u0019+\u0003\u0013\u00053G/\u001a:FC\u000eD\u0017\u0001\t;fgRlunY6TG\",G-\u001e7fe:{g\u000eU3sS>$\u0017n\u0019+bg.D#\u0001D2\u0011\u0005M#\u0017BA3U\u0005\u0011!Vm\u001d;\u0002;Q,7\u000f^'pG.\u001c6\r[3ek2,'\u000fU3sS>$\u0017n\u0019+bg.D#!D2\u0002AQ,7\u000f\u001e*fK:$(/\u00198u)\u0006\u001c8.\u00138N_\u000e\\7k\u00195fIVdWM\u001d\u0015\u0003\u001d\r\f1\u0003^3ti:{g\u000eU3sS>$\u0017n\u0019+bg.D#aD2\u0002GQ,7\u000f\u001e(p]B+'/[8eS\u000e$\u0016m]6XQ\u0016t\u0007+\u001a:j_\u0012L5OW3s_\"\u0012\u0001cY\u0001\u0011i\u0016\u001cH\u000fU3sS>$\u0017n\u0019+bg.D#!E2\u0002\u0017Q,7\u000f\u001e*fgR\f'\u000f\u001e\u0015\u0003%\r\f!\u0004^3tiVs7o\u00195fIVdW\r\u0015:pIV\u001cWM\u001d+bg.D#aE2\u00021Q,7\u000f^'pG.\u001c6\r[3ek2,'\u000fT8dW&tw\r\u000b\u0002\u0015G\"\"Ac\u001e>|!\t\u0019\u00060\u0003\u0002z)\n9A+[7f_V$\u0018!\u0002<bYV,g$A\b")
public class SchedulerTest {
    private final KafkaScheduler scheduler = new KafkaScheduler(1);
    private final MockTime mockTime = new MockTime();
    private final AtomicInteger counter1 = new AtomicInteger(0);
    private final AtomicInteger counter2 = new AtomicInteger(0);

    public KafkaScheduler scheduler() {
        return this.scheduler;
    }

    public MockTime mockTime() {
        return this.mockTime;
    }

    public AtomicInteger counter1() {
        return this.counter1;
    }

    public AtomicInteger counter2() {
        return this.counter2;
    }

    @BeforeEach
    public void setup() {
        this.scheduler().startup();
    }

    @AfterEach
    public void teardown() {
        this.scheduler().shutdown();
    }

    @Test
    public void testMockSchedulerNonPeriodicTask() {
        this.mockTime().scheduler.scheduleOnce("test1", () -> this.counter1().getAndIncrement(), 1L);
        this.mockTime().scheduler.scheduleOnce("test2", () -> this.counter2().getAndIncrement(), 100L);
        Assertions.assertEquals((int)0, (int)this.counter1().get(), (String)"Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented prior to task running.");
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Counter1 should be incremented");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented");
        this.mockTime().sleep(100000L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"More sleeping should not result in more incrementing on counter1.");
        Assertions.assertEquals((int)1, (int)this.counter2().get(), (String)"Counter2 should now be incremented.");
    }

    @Test
    public void testMockSchedulerPeriodicTask() {
        this.mockTime().scheduler.schedule("test1", () -> this.counter1().getAndIncrement(), 1L, 1L);
        this.mockTime().scheduler.schedule("test2", () -> this.counter2().getAndIncrement(), 100L, 100L);
        Assertions.assertEquals((int)0, (int)this.counter1().get(), (String)"Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented prior to task running.");
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Counter1 should be incremented");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented");
        this.mockTime().sleep(100L);
        Assertions.assertEquals((int)101, (int)this.counter1().get(), (String)"Counter1 should be incremented 101 times");
        Assertions.assertEquals((int)1, (int)this.counter2().get(), (String)"Counter2 should not be incremented once");
    }

    @Test
    public void testReentrantTaskInMockScheduler() {
        this.mockTime().scheduler.scheduleOnce("test1", () -> $this.mockTime().scheduler.scheduleOnce("test2", () -> this.counter2().getAndIncrement(), 0L), 1L);
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter2().get());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testNonPeriodicTask() {
        this.scheduler().scheduleOnce("test", () -> this.counter1().getAndIncrement());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long l = 30000L;
        if (testUtils$ == null) {
            throw null;
        }
        TestUtils$ retry_this = testUtils$;
        LongRef retry_wait = LongRef.create((long)1L);
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                SchedulerTest.$anonfun$testNonPeriodicTask$2(this);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    retry_this.logger().underlying().info(retry_this.msgWithLogIdent(TestUtils$.$anonfun$retry$1(retry_wait)));
                }
                Thread.sleep(retry_wait.elem);
                retry_wait.elem += package$.MODULE$.min(retry_wait.elem, 1000L);
                continue;
            }
            break;
        }
        Thread.sleep(5L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Should only run once");
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testNonPeriodicTaskWhenPeriodIsZero() {
        this.scheduler().schedule("test", () -> this.counter1().getAndIncrement(), 0L, 0L);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long l = 30000L;
        if (testUtils$ == null) {
            throw null;
        }
        TestUtils$ retry_this = testUtils$;
        LongRef retry_wait = LongRef.create((long)1L);
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                SchedulerTest.$anonfun$testNonPeriodicTaskWhenPeriodIsZero$2(this);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    retry_this.logger().underlying().info(retry_this.msgWithLogIdent(TestUtils$.$anonfun$retry$1(retry_wait)));
                }
                Thread.sleep(retry_wait.elem);
                retry_wait.elem += package$.MODULE$.min(retry_wait.elem, 1000L);
                continue;
            }
            break;
        }
        Thread.sleep(5L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Should only run once");
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testPeriodicTask() {
        this.scheduler().schedule("test", () -> this.counter1().getAndIncrement(), 0L, 5L);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long l = 30000L;
        if (testUtils$ == null) {
            throw null;
        }
        TestUtils$ retry_this = testUtils$;
        LongRef retry_wait = LongRef.create((long)1L);
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                SchedulerTest.$anonfun$testPeriodicTask$2(this);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    retry_this.logger().underlying().info(retry_this.msgWithLogIdent(TestUtils$.$anonfun$retry$1(retry_wait)));
                }
                Thread.sleep(retry_wait.elem);
                retry_wait.elem += package$.MODULE$.min(retry_wait.elem, 1000L);
                continue;
            }
            break;
        }
    }

    @Test
    public void testRestart() {
        this.mockTime().scheduler.scheduleOnce("test1", () -> this.counter1().getAndIncrement(), 1L);
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter1().get());
        this.mockTime().scheduler.shutdown();
        this.mockTime().scheduler.startup();
        this.mockTime().scheduler.scheduleOnce("test1", () -> this.counter1().getAndIncrement(), 1L);
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)2, (int)this.counter1().get());
    }

    @Test
    public void testUnscheduleProducerTask() {
        File tmpDir = TestUtils$.MODULE$.tempDir();
        File logDir = TestUtils$.MODULE$.randomPartitionLogDir(tmpDir);
        LogConfig logConfig = new LogConfig((Map)new Properties());
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats(BrokerTopicStats$.MODULE$.$lessinit$greater$default$1());
        int maxTransactionTimeoutMs = 300000;
        int maxProducerIdExpirationMs = 86400000;
        int producerIdExpirationCheckIntervalMs = 600000;
        TopicPartition topicPartition = UnifiedLog$.MODULE$.parseTopicPartitionName(logDir);
        LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(10);
        LogSegments segments = new LogSegments(topicPartition);
        Option leaderEpochCache = UnifiedLog$.MODULE$.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion(), "", (Option)None$.MODULE$, (Scheduler)this.mockTime().scheduler);
        ProducerStateManager producerStateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), (Time)this.mockTime());
        LoadedLogOffsets offsets = new LogLoader(logDir, topicPartition, logConfig, (Scheduler)this.scheduler(), (Time)this.mockTime(), logDirFailureChannel, true, segments, 0L, 0L, OptionConverters.RichOptionForJava8$.MODULE$.asJava$extension(OptionConverters$.MODULE$.RichOptionForJava8(leaderEpochCache)), producerStateManager, LogLoader$.MODULE$.$lessinit$greater$default$13(), LogLoader$.MODULE$.$lessinit$greater$default$14()).load();
        LocalLog localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint, offsets.nextOffsetMetadata, (Scheduler)this.scheduler(), (Time)this.mockTime(), topicPartition, logDirFailureChannel);
        UnifiedLog log = new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats, producerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager, (Option)None$.MODULE$, true, UnifiedLog$.MODULE$.$lessinit$greater$default$9(), UnifiedLog$.MODULE$.$lessinit$greater$default$10());
        Assertions.assertTrue((boolean)this.scheduler().taskRunning(log.producerExpireCheck()));
        log.close();
        Assertions.assertFalse((boolean)this.scheduler().taskRunning(log.producerExpireCheck()));
    }

    @Timeout(value=15L)
    @Test
    public void testMockSchedulerLocking() {
        CountDownLatch initLatch = new CountDownLatch(1);
        CountDownLatch completionLatch = new CountDownLatch(2);
        .colon.colon taskLatches = new .colon.colon((Object)new CountDownLatch(1), (List)new .colon.colon((Object)new CountDownLatch(1), (List)Nil$.MODULE$));
        this.mockTime().scheduler.scheduleOnce("test1", () -> SchedulerTest.$anonfun$testMockSchedulerLocking$1((List)taskLatches, initLatch, completionLatch), 1L);
        ScheduledExecutorService tickExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            tickExecutor.scheduleWithFixedDelay(() -> this.mockTime().sleep(1L), 0L, 1L, TimeUnit.MILLISECONDS);
            Assertions.assertTrue((boolean)initLatch.await(10L, TimeUnit.SECONDS));
            this.mockTime().scheduler.scheduleOnce("test2", () -> SchedulerTest.$anonfun$testMockSchedulerLocking$3((List)taskLatches, initLatch, completionLatch), 1L);
            taskLatches.foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
                x$1.countDown();
                return BoxedUnit.UNIT;
            });
            Assertions.assertTrue((boolean)completionLatch.await(10L, TimeUnit.SECONDS), (String)"Tasks did not complete");
        }
        finally {
            tickExecutor.shutdownNow();
        }
    }

    public static final /* synthetic */ void $anonfun$testNonPeriodicTask$2(SchedulerTest $this) {
        Assertions.assertEquals((int)$this.counter1().get(), (int)1);
    }

    public static final /* synthetic */ void $anonfun$testNonPeriodicTaskWhenPeriodIsZero$2(SchedulerTest $this) {
        Assertions.assertEquals((int)$this.counter1().get(), (int)1);
    }

    public static final /* synthetic */ void $anonfun$testPeriodicTask$2(SchedulerTest $this) {
        Assertions.assertTrue(($this.counter1().get() >= 20 ? 1 : 0) != 0, (String)"Should count to 20");
    }

    private static final void scheduledTask$1(CountDownLatch taskLatch, CountDownLatch initLatch$1, CountDownLatch completionLatch$1) {
        initLatch$1.countDown();
        Assertions.assertTrue((boolean)taskLatch.await(30L, TimeUnit.SECONDS), (String)"Timed out waiting for latch");
        completionLatch$1.countDown();
    }

    public static final /* synthetic */ void $anonfun$testMockSchedulerLocking$1(List taskLatches$1, CountDownLatch initLatch$1, CountDownLatch completionLatch$1) {
        SchedulerTest.scheduledTask$1((CountDownLatch)taskLatches$1.head(), initLatch$1, completionLatch$1);
    }

    public static final /* synthetic */ void $anonfun$testMockSchedulerLocking$3(List taskLatches$1, CountDownLatch initLatch$1, CountDownLatch completionLatch$1) {
        SchedulerTest.scheduledTask$1((CountDownLatch)taskLatches$1.apply(1), initLatch$1, completionLatch$1);
    }
}

