/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.lang.reflect.Field;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class KTableImplTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final Consumed<String, String> consumed = Consumed.with(this.stringSerde, this.stringSerde);
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();
    private File stateDir = null;
    private StreamsBuilder builder;
    private KTable<String, String> table;

    @Before
    public void setUp() {
        this.stateDir = TestUtils.tempDirectory((String)"kafka-test");
        this.builder = new StreamsBuilder();
        this.table = this.builder.table("test");
    }

    @Test
    public void testKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        String storeName2 = "storeName2";
        KTable<String, String> table1 = builder.table(topic1, this.consumed);
        MockProcessorSupplier proc1 = new MockProcessorSupplier();
        table1.toStream().process(proc1, new String[0]);
        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>(){

            @Override
            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        MockProcessorSupplier proc2 = new MockProcessorSupplier();
        table2.toStream().process(proc2, new String[0]);
        KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>(){

            @Override
            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        MockProcessorSupplier proc3 = new MockProcessorSupplier();
        table3.toStream().process(proc3, new String[0]);
        KTable<String, String> table4 = table1.through(this.stringSerde, this.stringSerde, topic2, storeName2);
        MockProcessorSupplier proc4 = new MockProcessorSupplier();
        table4.toStream().process(proc4, new String[0]);
        this.driver.setUp(builder, this.stateDir);
        this.driver.process(topic1, "A", "01");
        this.driver.flushState();
        this.driver.process(topic1, "B", "02");
        this.driver.flushState();
        this.driver.process(topic1, "C", "03");
        this.driver.flushState();
        this.driver.process(topic1, "D", "04");
        this.driver.flushState();
        this.driver.flushState();
        Assert.assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc1.processed);
        Assert.assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
        Assert.assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), proc3.processed);
        Assert.assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc4.processed);
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        String storeName2 = "storeName2";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed);
        KTableImpl table2 = (KTableImpl)table1.mapValues(new ValueMapper<String, Integer>(){

            @Override
            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        KTableImpl table3 = (KTableImpl)table2.filter(new Predicate<String, Integer>(){

            @Override
            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        KTableImpl table4 = (KTableImpl)table1.through(this.stringSerde, this.stringSerde, topic2, storeName2);
        KTableValueGetterSupplier getterSupplier1 = table1.valueGetterSupplier();
        KTableValueGetterSupplier getterSupplier2 = table2.valueGetterSupplier();
        KTableValueGetterSupplier getterSupplier3 = table3.valueGetterSupplier();
        KTableValueGetterSupplier getterSupplier4 = table4.valueGetterSupplier();
        this.driver.setUp(builder, this.stateDir, null, null);
        Assert.assertEquals((long)2L, (long)this.driver.allStateStores().size());
        KTableValueGetter getter1 = getterSupplier1.get();
        getter1.init(this.driver.context());
        KTableValueGetter getter2 = getterSupplier2.get();
        getter2.init(this.driver.context());
        KTableValueGetter getter3 = getterSupplier3.get();
        getter3.init(this.driver.context());
        KTableValueGetter getter4 = getterSupplier4.get();
        getter4.init(this.driver.context());
        this.driver.process(topic1, "A", "01");
        this.driver.process(topic1, "B", "01");
        this.driver.process(topic1, "C", "01");
        this.driver.flushState();
        Assert.assertEquals((Object)"01", getter1.get("A"));
        Assert.assertEquals((Object)"01", getter1.get("B"));
        Assert.assertEquals((Object)"01", getter1.get("C"));
        Assert.assertEquals((Object)new Integer(1), getter2.get("A"));
        Assert.assertEquals((Object)new Integer(1), getter2.get("B"));
        Assert.assertEquals((Object)new Integer(1), getter2.get("C"));
        Assert.assertNull(getter3.get("A"));
        Assert.assertNull(getter3.get("B"));
        Assert.assertNull(getter3.get("C"));
        Assert.assertEquals((Object)"01", getter4.get("A"));
        Assert.assertEquals((Object)"01", getter4.get("B"));
        Assert.assertEquals((Object)"01", getter4.get("C"));
        this.driver.process(topic1, "A", "02");
        this.driver.process(topic1, "B", "02");
        this.driver.flushState();
        Assert.assertEquals((Object)"02", getter1.get("A"));
        Assert.assertEquals((Object)"02", getter1.get("B"));
        Assert.assertEquals((Object)"01", getter1.get("C"));
        Assert.assertEquals((Object)new Integer(2), getter2.get("A"));
        Assert.assertEquals((Object)new Integer(2), getter2.get("B"));
        Assert.assertEquals((Object)new Integer(1), getter2.get("C"));
        Assert.assertEquals((Object)new Integer(2), getter3.get("A"));
        Assert.assertEquals((Object)new Integer(2), getter3.get("B"));
        Assert.assertNull(getter3.get("C"));
        Assert.assertEquals((Object)"02", getter4.get("A"));
        Assert.assertEquals((Object)"02", getter4.get("B"));
        Assert.assertEquals((Object)"01", getter4.get("C"));
        this.driver.process(topic1, "A", "03");
        this.driver.flushState();
        Assert.assertEquals((Object)"03", getter1.get("A"));
        Assert.assertEquals((Object)"02", getter1.get("B"));
        Assert.assertEquals((Object)"01", getter1.get("C"));
        Assert.assertEquals((Object)new Integer(3), getter2.get("A"));
        Assert.assertEquals((Object)new Integer(2), getter2.get("B"));
        Assert.assertEquals((Object)new Integer(1), getter2.get("C"));
        Assert.assertNull(getter3.get("A"));
        Assert.assertEquals((Object)new Integer(2), getter3.get("B"));
        Assert.assertNull(getter3.get("C"));
        Assert.assertEquals((Object)"03", getter4.get("A"));
        Assert.assertEquals((Object)"02", getter4.get("B"));
        Assert.assertEquals((Object)"01", getter4.get("C"));
        this.driver.process(topic1, "A", null);
        this.driver.flushState();
        Assert.assertNull(getter1.get("A"));
        Assert.assertEquals((Object)"02", getter1.get("B"));
        Assert.assertEquals((Object)"01", getter1.get("C"));
        Assert.assertNull(getter2.get("A"));
        Assert.assertEquals((Object)new Integer(2), getter2.get("B"));
        Assert.assertEquals((Object)new Integer(1), getter2.get("C"));
        Assert.assertNull(getter3.get("A"));
        Assert.assertEquals((Object)new Integer(2), getter3.get("B"));
        Assert.assertNull(getter3.get("C"));
        Assert.assertNull(getter4.get("A"));
        Assert.assertEquals((Object)"02", getter4.get("B"));
        Assert.assertEquals((Object)"01", getter4.get("C"));
    }

    @Test
    public void testStateStoreLazyEval() {
        String topic1 = "topic1";
        String topic2 = "topic2";
        StreamsBuilder builder = new StreamsBuilder();
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed);
        builder.table(topic2, this.consumed);
        KTableImpl table1Mapped = (KTableImpl)table1.mapValues(new ValueMapper<String, Integer>(){

            @Override
            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        table1Mapped.filter(new Predicate<String, Integer>(){

            @Override
            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.driver.setUp(builder, this.stateDir, null, null);
        this.driver.setTime(0L);
        Assert.assertEquals((long)2L, (long)this.driver.allStateStores().size());
    }

    @Test
    public void testStateStore() {
        String topic1 = "topic1";
        String topic2 = "topic2";
        StreamsBuilder builder = new StreamsBuilder();
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed);
        KTableImpl table2 = (KTableImpl)builder.table(topic2, this.consumed);
        KTableImpl table1Mapped = (KTableImpl)table1.mapValues(new ValueMapper<String, Integer>(){

            @Override
            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        KTableImpl table1MappedFiltered = (KTableImpl)table1Mapped.filter(new Predicate<String, Integer>(){

            @Override
            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        table2.join(table1MappedFiltered, new ValueJoiner<String, Integer, String>(){

            @Override
            public String apply(String v1, Integer v2) {
                return v1 + v2;
            }
        });
        this.driver.setUp(builder, this.stateDir, null, null);
        this.driver.setTime(0L);
        Assert.assertEquals((long)2L, (long)this.driver.allStateStores().size());
    }

    @Test
    public void testRepartition() throws NoSuchFieldException, IllegalAccessException {
        String topic1 = "topic1";
        String storeName1 = "storeName1";
        StreamsBuilder builder = new StreamsBuilder();
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed, Materialized.as(storeName1).withKeySerde(this.stringSerde).withValueSerde(this.stringSerde));
        table1.groupBy(MockMapper.noOpKeyValueMapper()).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");
        table1.groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
        this.driver.setUp(builder, this.stateDir, this.stringSerde, this.stringSerde);
        this.driver.setTime(0L);
        Assert.assertEquals((long)3L, (long)this.driver.allStateStores().size());
        Assert.assertTrue((boolean)this.driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
        Assert.assertTrue((boolean)this.driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
        Assert.assertTrue((boolean)this.driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
        Assert.assertTrue((boolean)this.driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
        Field valSerializerField = ((SinkNode)this.driver.processor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
        Field valDeserializerField = ((SourceNode)this.driver.processor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
        valSerializerField.setAccessible(true);
        valDeserializerField.setAccessible(true);
        Assert.assertNotNull(((ChangedSerializer)valSerializerField.get(this.driver.processor("KSTREAM-SINK-0000000003"))).inner());
        Assert.assertNotNull(((ChangedDeserializer)valDeserializerField.get(this.driver.processor("KSTREAM-SOURCE-0000000004"))).inner());
        Assert.assertNotNull(((ChangedSerializer)valSerializerField.get(this.driver.processor("KSTREAM-SINK-0000000007"))).inner());
        Assert.assertNotNull(((ChangedDeserializer)valDeserializerField.get(this.driver.processor("KSTREAM-SOURCE-0000000008"))).inner());
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSelectorOnToStream() {
        this.table.toStream(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicOnTo() {
        this.table.to(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilter() {
        this.table.filter(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilterNot() {
        this.table.filterNot(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValues() {
        this.table.mapValues((ValueMapper)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValueWithKey() {
        this.table.mapValues((ValueMapperWithKey)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullFilePathOnWriteAsText() {
        this.table.writeAsText(null);
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAllowEmptyFilePathOnWriteAsText() {
        this.table.writeAsText("\t  \t");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullActionOnForEach() {
        this.table.foreach(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldAllowNullTopicInThrough() {
        this.table.through((String)null, "store");
    }

    @Test
    public void shouldAllowNullStoreInThrough() {
        this.table.through("topic", (String)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSelectorOnGroupBy() {
        this.table.groupBy(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnJoin() {
        this.table.join(null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test
    public void shouldAllowNullStoreInJoin() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER, null, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullStoreSupplierInJoin() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullStoreSupplierInLeftJoin() {
        this.table.leftJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullStoreSupplierInOuterJoin() {
        this.table.outerJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerJoin() {
        this.table.join(this.table, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnOuterJoin() {
        this.table.outerJoin(null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnOuterJoin() {
        this.table.outerJoin(this.table, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnLeftJoin() {
        this.table.leftJoin(this.table, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnLeftJoin() {
        this.table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
        this.table.filter(new Predicate<String, String>(){

            @Override
            public boolean test(String key, String value) {
                return false;
            }
        }, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
        this.table.filterNot(new Predicate<String, String>(){

            @Override
            public boolean test(String key, String value) {
                return false;
            }
        }, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() {
        this.table.leftJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
        this.table.leftJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized)null);
    }
}

