/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;

public class KTableAggregateTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final Consumed<String, String> consumed = Consumed.with(this.stringSerde, this.stringSerde);
    private final Grouped<String, String> stringSerialized = Grouped.with(this.stringSerde, this.stringSerde);
    private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier();

    @Test
    public void testAggBasic() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.groupBy(MockMapper.noOpKeyValueMapper(), this.stringSerialized).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"topic1-Canonized").withValueSerde(this.stringSerde));
        table2.toStream().process(this.supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy"), Utils.mkEntry((Object)"application.id", (Object)"test"), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory((String)"kafka-test").getAbsolutePath())})), 0L);){
            ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), 0L, 0L);
            driver.pipeInput(recordFactory.create("topic1", (Object)"A", (Object)"1", 10L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"B", (Object)"2", 15L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"A", (Object)"3", 20L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"B", (Object)"4", 18L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"C", (Object)"5", 5L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"D", (Object)"6", 25L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"B", (Object)"7", 15L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"C", (Object)"8", 10L));
            Assert.assertEquals(Arrays.asList("A:0+1 (ts: 10)", "B:0+2 (ts: 15)", "A:0+1-1 (ts: 20)", "A:0+1-1+3 (ts: 20)", "B:0+2-2 (ts: 18)", "B:0+2-2+4 (ts: 18)", "C:0+5 (ts: 5)", "D:0+6 (ts: 25)", "B:0+2-2+4-4 (ts: 18)", "B:0+2-2+4-4+7 (ts: 18)", "C:0+5-5 (ts: 10)", "C:0+5-5+8 (ts: 10)"), this.supplier.theCapturedProcessor().processed);
        }
    }

    @Test
    public void testAggRepartition() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.groupBy((key, value) -> {
            switch (key) {
                case "null": {
                    return KeyValue.pair(null, (Object)value);
                }
                case "NULL": {
                    return null;
                }
            }
            return KeyValue.pair((Object)value, (Object)value);
        }, this.stringSerialized).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"topic1-Canonized").withValueSerde(this.stringSerde));
        table2.toStream().process(this.supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy"), Utils.mkEntry((Object)"application.id", (Object)"test"), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory((String)"kafka-test").getAbsolutePath())})), 0L);){
            ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), 0L, 0L);
            driver.pipeInput(recordFactory.create("topic1", (Object)"A", (Object)"1", 10L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"A", (Object)null, 15L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"A", (Object)"1", 12L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"B", (Object)"2", 20L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"null", (Object)"3", 25L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"B", (Object)"4", 23L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"NULL", (Object)"5", 24L));
            driver.pipeInput(recordFactory.create("topic1", (Object)"B", (Object)"7", 22L));
            Assert.assertEquals(Arrays.asList("1:0+1 (ts: 10)", "1:0+1-1 (ts: 15)", "1:0+1-1+1 (ts: 15)", "2:0+2 (ts: 20)", "2:0+2-2 (ts: 23)", "4:0+4 (ts: 23)", "4:0+4-4 (ts: 23)", "7:0+7 (ts: 22)"), this.supplier.theCapturedProcessor().processed);
        }
    }

    private static void testCountHelper(StreamsBuilder builder, String input, MockProcessorSupplier<String, Object> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy"), Utils.mkEntry((Object)"application.id", (Object)"test"), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory((String)"kafka-test").getAbsolutePath())})), 0L);){
            ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), 0L, 0L);
            driver.pipeInput(recordFactory.create(input, (Object)"A", (Object)"green", 10L));
            driver.pipeInput(recordFactory.create(input, (Object)"B", (Object)"green", 9L));
            driver.pipeInput(recordFactory.create(input, (Object)"A", (Object)"blue", 12L));
            driver.pipeInput(recordFactory.create(input, (Object)"C", (Object)"yellow", 15L));
            driver.pipeInput(recordFactory.create(input, (Object)"D", (Object)"green", 11L));
            Assert.assertEquals(Arrays.asList("green:1 (ts: 10)", "green:2 (ts: 10)", "green:1 (ts: 12)", "blue:1 (ts: 12)", "yellow:1 (ts: 15)", "green:2 (ts: 12)"), supplier.theCapturedProcessor().processed);
        }
    }

    @Test
    public void testCount() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        builder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialized).count(Materialized.as((String)"count")).toStream().process(this.supplier, new String[0]);
        KTableAggregateTest.testCountHelper(builder, "count-test-input", this.supplier);
    }

    @Test
    public void testCountWithInternalStore() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        builder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialized).count().toStream().process(this.supplier, new String[0]);
        KTableAggregateTest.testCountHelper(builder, "count-test-input", this.supplier);
    }

    @Test
    public void testRemoveOldBeforeAddNew() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        builder.table("count-test-input", this.consumed).groupBy((key, value) -> KeyValue.pair((Object)String.valueOf(key.charAt(0)), (Object)String.valueOf(key.charAt(1))), this.stringSerialized).aggregate(() -> "", (aggKey, value, aggregate) -> aggregate + value, (key, value, aggregate) -> aggregate.replaceAll((String)value, ""), Materialized.as((String)"someStore").withValueSerde(Serdes.String())).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy"), Utils.mkEntry((Object)"application.id", (Object)"test"), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory((String)"kafka-test").getAbsolutePath())})), 0L);){
            ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), 0L, 0L);
            MockProcessor proc = supplier.theCapturedProcessor();
            driver.pipeInput(recordFactory.create("count-test-input", (Object)"11", (Object)"A", 10L));
            driver.pipeInput(recordFactory.create("count-test-input", (Object)"12", (Object)"B", 8L));
            driver.pipeInput(recordFactory.create("count-test-input", (Object)"11", (Object)null, 12L));
            driver.pipeInput(recordFactory.create("count-test-input", (Object)"12", (Object)"C", 6L));
            Assert.assertEquals(Arrays.asList("1:1 (ts: 10)", "1:12 (ts: 10)", "1:2 (ts: 12)", "1: (ts: 12)", "1:2 (ts: 12)"), proc.processed);
        }
    }
}

