/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KTableImplTest {
    private final Consumed<String, String> stringConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final Consumed<String, String> consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final Produced<String, String> produced = Produced.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), 0L);
    private final Serde<String> mySerde = new Serdes.StringSerde();
    private KTable<String, String> table;

    @Before
    public void setUp() {
        this.table = new StreamsBuilder().table("test");
    }

    @Test
    public void testKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        KTable table1 = builder.table("topic1", this.consumed);
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table1.toStream().process(supplier, new String[0]);
        KTable table2 = table1.mapValues(Integer::new);
        table2.toStream().process(supplier, new String[0]);
        KTable table3 = table2.filter((key, value) -> value % 2 == 0);
        table3.toStream().process(supplier, new String[0]);
        table1.toStream().to("topic2", this.produced);
        KTable table4 = builder.table("topic2", this.consumed);
        table4.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"01", 5L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"02", 100L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"03", 0L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"D", (Object)"04", 0L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"05", 10L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"06", 8L));
        }
        List processors = supplier.capturedProcessors(4);
        Assert.assertEquals(Arrays.asList("A:01 (ts: 5)", "B:02 (ts: 100)", "C:03 (ts: 0)", "D:04 (ts: 0)", "A:05 (ts: 10)", "A:06 (ts: 8)"), processors.get((int)0).processed);
        Assert.assertEquals(Arrays.asList("A:1 (ts: 5)", "B:2 (ts: 100)", "C:3 (ts: 0)", "D:4 (ts: 0)", "A:5 (ts: 10)", "A:6 (ts: 8)"), processors.get((int)1).processed);
        Assert.assertEquals(Arrays.asList("A:null (ts: 5)", "B:2 (ts: 100)", "C:null (ts: 0)", "D:4 (ts: 0)", "A:null (ts: 10)", "A:6 (ts: 8)"), processors.get((int)2).processed);
        Assert.assertEquals(Arrays.asList("A:01 (ts: 5)", "B:02 (ts: 100)", "C:03 (ts: 0)", "D:04 (ts: 0)", "A:05 (ts: 10)", "A:06 (ts: 8)"), processors.get((int)3).processed);
    }

    @Test
    public void shouldPreserveSerdesForOperators() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable table1 = builder.table("topic-2", this.stringConsumed);
        ConsumedInternal consumedInternal = new ConsumedInternal(this.stringConsumed);
        KeyValueMapper selector = (key, value) -> key;
        ValueMapper mapper = value -> value;
        ValueJoiner joiner = (value1, value2) -> value1;
        ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = () -> new ValueTransformerWithKey<String, String, String>(){

            public void init(ProcessorContext context) {
            }

            public String transform(String key, String value) {
                return value;
            }

            public void close() {
            }
        };
        Assert.assertEquals((Object)((AbstractStream)table1.filter((key, value) -> false)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertEquals((Object)((AbstractStream)table1.filter((key, value) -> false)).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertEquals((Object)((AbstractStream)table1.filter((key, value) -> false, Materialized.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.filter((key, value) -> false, Materialized.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.filterNot((key, value) -> false)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertEquals((Object)((AbstractStream)table1.filterNot((key, value) -> false)).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertEquals((Object)((AbstractStream)table1.filterNot((key, value) -> false, Materialized.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.filterNot((key, value) -> false, Materialized.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.mapValues(mapper)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)table1.mapValues(mapper)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)table1.mapValues(mapper, Materialized.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.mapValues(mapper, Materialized.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.toStream()).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertEquals((Object)((AbstractStream)table1.toStream()).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertNull((Object)((AbstractStream)table1.toStream(selector)).keySerde());
        Assert.assertEquals((Object)((AbstractStream)table1.toStream(selector)).valueSerde(), (Object)consumedInternal.valueSerde());
        Assert.assertEquals((Object)((AbstractStream)table1.transformValues(valueTransformerWithKeySupplier, new String[0])).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)table1.transformValues(valueTransformerWithKeySupplier, new String[0])).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(this.mySerde, this.mySerde), new String[0])).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(this.mySerde, this.mySerde), new String[0])).valueSerde(), this.mySerde);
        Assert.assertNull((Object)((AbstractStream)table1.groupBy(KeyValue::new)).keySerde());
        Assert.assertNull((Object)((AbstractStream)table1.groupBy(KeyValue::new)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)table1.groupBy(KeyValue::new, Grouped.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.groupBy(KeyValue::new, Grouped.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.join(table1, joiner)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)table1.join(table1, joiner)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)table1.join(table1, joiner, Materialized.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.join(table1, joiner, Materialized.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.leftJoin(table1, joiner)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)table1.leftJoin(table1, joiner)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)table1.leftJoin(table1, joiner, Materialized.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.leftJoin(table1, joiner, Materialized.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.outerJoin(table1, joiner)).keySerde(), (Object)consumedInternal.keySerde());
        Assert.assertNull((Object)((AbstractStream)table1.outerJoin(table1, joiner)).valueSerde());
        Assert.assertEquals((Object)((AbstractStream)table1.outerJoin(table1, joiner, Materialized.with(this.mySerde, this.mySerde))).keySerde(), this.mySerde);
        Assert.assertEquals((Object)((AbstractStream)table1.outerJoin(table1, joiner, Materialized.with(this.mySerde, this.mySerde))).valueSerde(), this.mySerde);
    }

    @Test
    public void testStateStoreLazyEval() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        builder.table("topic2", this.consumed);
        KTableImpl table1Mapped = (KTableImpl)table1.mapValues(Integer::new);
        table1Mapped.filter((key, value) -> value % 2 == 0);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            Assert.assertEquals((long)0L, (long)driver.getAllStateStores().size());
        }
    }

    @Test
    public void testStateStore() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        KTableImpl table2 = (KTableImpl)builder.table("topic2", this.consumed);
        KTableImpl table1Mapped = (KTableImpl)table1.mapValues(Integer::new);
        KTableImpl table1MappedFiltered = (KTableImpl)table1Mapped.filter((key, value) -> value % 2 == 0);
        table2.join((KTable)table1MappedFiltered, (v1, v2) -> v1 + v2);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            Assert.assertEquals((long)2L, (long)driver.getAllStateStores().size());
        }
    }

    private void assertTopologyContainsProcessor(Topology topology, String processorName) {
        for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) {
            for (TopologyDescription.Node node : subtopology.nodes()) {
                if (!node.name().equals(processorName)) continue;
                return;
            }
        }
        throw new AssertionError((Object)("No processor named '" + processorName + "'found in the provided Topology:\n" + topology.describe()));
    }

    @Test
    public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String storeName1 = "storeName1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed, Materialized.as((String)"storeName1").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        table1.groupBy(MockMapper.noOpKeyValueMapper()).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"mock-result1"));
        table1.groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as((String)"mock-result2"));
        Topology topology = builder.build();
        try (TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, this.props);){
            Assert.assertEquals((long)3L, (long)driver.getAllStateStores().size());
            this.assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000003");
            this.assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000004");
            this.assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007");
            this.assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008");
            Field valSerializerField = ((SinkNode)driver.getProcessor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
            Field valDeserializerField = ((SourceNode)driver.getProcessor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
            valSerializerField.setAccessible(true);
            valDeserializerField.setAccessible(true);
            Assert.assertNotNull((Object)((ChangedSerializer)valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
            Assert.assertNotNull((Object)((ChangedDeserializer)valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
            Assert.assertNotNull((Object)((ChangedSerializer)valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
            Assert.assertNotNull((Object)((ChangedDeserializer)valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
        }
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSelectorOnToStream() {
        this.table.toStream(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilter() {
        this.table.filter(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilterNot() {
        this.table.filterNot(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValues() {
        this.table.mapValues((ValueMapper)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValueWithKey() {
        this.table.mapValues((ValueMapperWithKey)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSelectorOnGroupBy() {
        this.table.groupBy(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnJoin() {
        this.table.join(null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test
    public void shouldAllowNullStoreInJoin() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerJoin() {
        this.table.join(this.table, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnOuterJoin() {
        this.table.outerJoin(null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnOuterJoin() {
        this.table.outerJoin(this.table, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnLeftJoin() {
        this.table.leftJoin(this.table, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnLeftJoin() {
        this.table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
        this.table.filter((key, value) -> false, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
        this.table.filterNot((key, value) -> false, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() {
        this.table.leftJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
        this.table.outerJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplierIsNull() {
        this.table.transformValues((ValueTransformerWithKeySupplier)null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
        ValueTransformerWithKeySupplier valueTransformerSupplier = (ValueTransformerWithKeySupplier)EasyMock.mock(ValueTransformerWithKeySupplier.class);
        this.table.transformValues(valueTransformerSupplier, (Materialized)null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() {
        ValueTransformerWithKeySupplier valueTransformerSupplier = (ValueTransformerWithKeySupplier)EasyMock.mock(ValueTransformerWithKeySupplier.class);
        this.table.transformValues(valueTransformerSupplier, (String[])null);
    }
}

