/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class KStreamTransformTest {
    private static final String TOPIC_NAME = "topic";
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());

    @Test
    public void testTransform() {
        StreamsBuilder builder = new StreamsBuilder();
        TransformerSupplier transformerSupplier = () -> new Transformer<Number, Number, KeyValue<Integer, Integer>>(){
            private int total = 0;

            public void init(ProcessorContext context) {
                context.schedule(Duration.ofMillis(1L), PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward((Object)-1, (Object)((int)timestamp), To.all().withTimestamp(timestamp)));
            }

            public KeyValue<Integer, Integer> transform(Number key, Number value) {
                this.total += value.intValue();
                return KeyValue.pair((Object)(key.intValue() * 2), (Object)this.total);
            }

            public void close() {
            }
        };
        int[] expectedKeys = new int[]{1, 10, 100, 1000};
        MockProcessorSupplier processor = new MockProcessorSupplier();
        KStream stream = builder.stream(TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        stream.transform(transformerSupplier, new String[0]).process(processor, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC_NAME, (Serializer)new IntegerSerializer(), (Serializer)new IntegerSerializer());
            for (int expectedKey : expectedKeys) {
                inputTopic.pipeInput((Object)expectedKey, (Object)(expectedKey * 10), (long)expectedKey / 2L);
            }
            driver.advanceWallClockTime(Duration.ofMillis(2L));
            driver.advanceWallClockTime(Duration.ofMillis(1L));
            KeyValueTimestamp[] expected = new KeyValueTimestamp[]{new KeyValueTimestamp<Integer, Integer>(2, 10, 0L), new KeyValueTimestamp<Integer, Integer>(20, 110, 5L), new KeyValueTimestamp<Integer, Integer>(200, 1110, 50L), new KeyValueTimestamp<Integer, Integer>(2000, 11110, 500L), new KeyValueTimestamp<Integer, Integer>(-1, 2, 2L), new KeyValueTimestamp<Integer, Integer>(-1, 3, 3L)};
            Assertions.assertEquals((int)expected.length, (int)processor.theCapturedProcessor().processed().size());
            for (int i = 0; i < expected.length; ++i) {
                Assertions.assertEquals((Object)expected[i], processor.theCapturedProcessor().processed().get(i));
            }
        }
    }

    @Test
    public void testTransformWithNewDriverAndPunctuator() {
        StreamsBuilder builder = new StreamsBuilder();
        TransformerSupplier transformerSupplier = () -> new Transformer<Number, Number, KeyValue<Integer, Integer>>(){
            private int total = 0;

            public void init(ProcessorContext context) {
                context.schedule(Duration.ofMillis(1L), PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward((Object)-1, (Object)((int)timestamp), To.all().withTimestamp(timestamp)));
            }

            public KeyValue<Integer, Integer> transform(Number key, Number value) {
                this.total += value.intValue();
                return KeyValue.pair((Object)(key.intValue() * 2), (Object)this.total);
            }

            public void close() {
            }
        };
        int[] expectedKeys = new int[]{1, 10, 100, 1000};
        MockProcessorSupplier processor = new MockProcessorSupplier();
        KStream stream = builder.stream(TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        stream.transform(transformerSupplier, new String[0]).process(processor, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC_NAME, (Serializer)new IntegerSerializer(), (Serializer)new IntegerSerializer());
            for (int expectedKey : expectedKeys) {
                inputTopic.pipeInput((Object)expectedKey, (Object)(expectedKey * 10), 0L);
            }
            driver.advanceWallClockTime(Duration.ofMillis(2L));
            driver.advanceWallClockTime(Duration.ofMillis(1L));
        }
        Assertions.assertEquals((int)6, (int)processor.theCapturedProcessor().processed().size());
        KeyValueTimestamp[] expected = new KeyValueTimestamp[]{new KeyValueTimestamp<Integer, Integer>(2, 10, 0L), new KeyValueTimestamp<Integer, Integer>(20, 110, 0L), new KeyValueTimestamp<Integer, Integer>(200, 1110, 0L), new KeyValueTimestamp<Integer, Integer>(2000, 11110, 0L), new KeyValueTimestamp<Integer, Integer>(-1, 2, 2L), new KeyValueTimestamp<Integer, Integer>(-1, 3, 3L)};
        for (int i = 0; i < expected.length; ++i) {
            Assertions.assertEquals((Object)expected[i], processor.theCapturedProcessor().processed().get(i));
        }
    }
}

