/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.ProcessorNodePunctuator;
import org.apache.kafka.streams.processor.internals.PunctuationQueue;
import org.apache.kafka.streams.processor.internals.PunctuationSchedule;
import org.apache.kafka.test.MockProcessorNode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class PunctuationQueueTest {
    private final MockProcessorNode<String, String, ?, ?> node = new MockProcessorNode();
    private final PunctuationQueue queue = new PunctuationQueue();
    private final Punctuator punctuator = timestamp -> this.node.mockProcessor.punctuatedStreamTime().add(timestamp);

    @Test
    public void testPunctuationInterval() {
        PunctuationSchedule sched = new PunctuationSchedule(this.node, 0L, 100L, this.punctuator);
        long now = sched.timestamp - 100L;
        this.queue.schedule(sched);
        this.assertCanPunctuateAtPrecisely(now + 100L);
        ProcessorNodePunctuator processorNodePunctuator = (node, timestamp, type, punctuator) -> punctuator.punctuate(timestamp);
        this.queue.maybePunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)0, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 100L);
        this.queue.maybePunctuate(now + 99L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)0, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 100L);
        this.queue.maybePunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)1, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 200L);
        this.queue.maybePunctuate(now + 199L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)1, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 200L);
        this.queue.maybePunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)2, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 300L);
        this.queue.maybePunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)3, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 1100L);
        this.queue.maybePunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)3, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 1100L);
        this.queue.maybePunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)4, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 1200L);
    }

    @Test
    public void testPunctuationIntervalCustomAlignment() {
        PunctuationSchedule sched = new PunctuationSchedule(this.node, 50L, 100L, this.punctuator);
        long now = sched.timestamp - 50L;
        this.queue.schedule(sched);
        this.assertCanPunctuateAtPrecisely(now + 50L);
        ProcessorNodePunctuator processorNodePunctuator = (node, timestamp, type, punctuator) -> punctuator.punctuate(timestamp);
        this.queue.maybePunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)0, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 50L);
        this.queue.maybePunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)0, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 50L);
        this.queue.maybePunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)1, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 150L);
        this.queue.maybePunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)1, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 150L);
        this.queue.maybePunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)2, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 250L);
        this.queue.maybePunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)3, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 1150L);
        this.queue.maybePunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)3, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 1150L);
        this.queue.maybePunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)4, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 1250L);
    }

    @Test
    public void testPunctuationIntervalCancelFromPunctuator() {
        PunctuationSchedule sched = new PunctuationSchedule(this.node, 0L, 100L, this.punctuator);
        long now = sched.timestamp - 100L;
        Cancellable cancellable = this.queue.schedule(sched);
        this.assertCanPunctuateAtPrecisely(now + 100L);
        ProcessorNodePunctuator processorNodePunctuator = (node, timestamp, type, punctuator) -> {
            punctuator.punctuate(timestamp);
            cancellable.cancel();
        };
        this.queue.maybePunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)0, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        this.assertCanPunctuateAtPrecisely(now + 100L);
        this.queue.maybePunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)1, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        Assertions.assertFalse((boolean)this.queue.canPunctuate(Long.MAX_VALUE));
        this.queue.maybePunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assertions.assertEquals((int)1, (int)this.node.mockProcessor.punctuatedStreamTime().size());
        Assertions.assertFalse((boolean)this.queue.canPunctuate(Long.MAX_VALUE));
    }

    private void assertCanPunctuateAtPrecisely(long now) {
        Assertions.assertFalse((boolean)this.queue.canPunctuate(now - 1L));
        Assertions.assertTrue((boolean)this.queue.canPunctuate(now));
        Assertions.assertTrue((boolean)this.queue.canPunctuate(now + 1L));
    }
}

