/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class GlobalKTableIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final Properties BROKER_CONFIG = new Properties();
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    private static volatile int testNo;
    private final MockTime mockTime;
    private final KeyValueMapper<String, Long, Long> keyMapper;
    private final ValueJoiner<Long, String, String> joiner;
    private final String globalStore = "globalStore";
    private final Map<String, String> results;
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String globalTableTopic;
    private String streamTopic;
    private GlobalKTable<Long, String> globalTable;
    private KStream<String, Long> stream;
    private ForeachAction<String, String> foreachAction;

    public GlobalKTableIntegrationTest() {
        this.mockTime = GlobalKTableIntegrationTest.CLUSTER.time;
        this.keyMapper = new KeyValueMapper<String, Long, Long>(){

            @Override
            public Long apply(String key, Long value) {
                return value;
            }
        };
        this.joiner = new ValueJoiner<Long, String, String>(){

            @Override
            public String apply(Long value1, String value2) {
                return value1 + "+" + value2;
            }
        };
        this.globalStore = "globalStore";
        this.results = new HashMap<String, String>();
    }

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String applicationId = "globalTableTopic-table-test-" + ++testNo;
        this.streamsConfiguration.put("application.id", applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        this.streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100);
        this.globalTable = this.builder.globalTable(this.globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as("globalStore").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()));
        Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
        this.stream = this.builder.stream(this.streamTopic, stringLongConsumed);
        this.foreachAction = new ForeachAction<String, String>(){

            @Override
            public void apply(String key, String value) {
                GlobalKTableIntegrationTest.this.results.put(key, value);
            }
        };
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
        KStream<String, String> streamTableJoin = this.stream.leftJoin(this.globalTable, this.keyMapper, this.joiner);
        streamTableJoin.foreach(this.foreachAction);
        this.produceInitialGlobalTableValues();
        this.startStreams();
        this.produceTopicValues(this.streamTopic);
        final HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("a", "1+A");
        expected.put("b", "2+B");
        expected.put("c", "3+C");
        expected.put("d", "4+D");
        expected.put("e", "5+null");
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return GlobalKTableIntegrationTest.this.results.equals(expected);
            }
        }, (long)30000L, (String)"waiting for initial values");
        this.produceGlobalTableValues();
        final ReadOnlyKeyValueStore replicatedStore = this.kafkaStreams.store("globalStore", QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return "J".equals(replicatedStore.get(5L));
            }
        }, (long)30000L, (String)"waiting for data in replicated store");
        this.produceTopicValues(this.streamTopic);
        expected.put("a", "1+F");
        expected.put("b", "2+G");
        expected.put("c", "3+H");
        expected.put("d", "4+I");
        expected.put("e", "5+J");
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return GlobalKTableIntegrationTest.this.results.equals(expected);
            }
        }, (long)30000L, (String)"waiting for final values");
    }

    @Test
    public void shouldKStreamGlobalKTableJoin() throws Exception {
        KStream<String, String> streamTableJoin = this.stream.join(this.globalTable, this.keyMapper, this.joiner);
        streamTableJoin.foreach(this.foreachAction);
        this.produceInitialGlobalTableValues();
        this.startStreams();
        this.produceTopicValues(this.streamTopic);
        final HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("a", "1+A");
        expected.put("b", "2+B");
        expected.put("c", "3+C");
        expected.put("d", "4+D");
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return GlobalKTableIntegrationTest.this.results.equals(expected);
            }
        }, (long)30000L, (String)"waiting for initial values");
        this.produceGlobalTableValues();
        final ReadOnlyKeyValueStore replicatedStore = this.kafkaStreams.store("globalStore", QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return "J".equals(replicatedStore.get(5L));
            }
        }, (long)30000L, (String)"waiting for data in replicated store");
        this.produceTopicValues(this.streamTopic);
        expected.put("a", "1+F");
        expected.put("b", "2+G");
        expected.put("c", "3+H");
        expected.put("d", "4+I");
        expected.put("e", "5+J");
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return GlobalKTableIntegrationTest.this.results.equals(expected);
            }
        }, (long)30000L, (String)"waiting for final values");
    }

    @Test
    public void shouldRestoreTransactionalMessages() throws Exception {
        this.produceInitialGlobalTableValues(true);
        this.startStreams();
        final HashMap<Long, String> expected = new HashMap<Long, String>();
        expected.put(1L, "A");
        expected.put(2L, "B");
        expected.put(3L, "C");
        expected.put(4L, "D");
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                ReadOnlyKeyValueStore store = null;
                try {
                    store = GlobalKTableIntegrationTest.this.kafkaStreams.store("globalStore", QueryableStoreTypes.keyValueStore());
                }
                catch (InvalidStateStoreException ex) {
                    return false;
                }
                HashMap result2 = new HashMap();
                KeyValueIterator it = store.all();
                while (it.hasNext()) {
                    KeyValue kv = (KeyValue)it.next();
                    result2.put(kv.key, kv.value);
                }
                return result2.equals(expected);
            }
        }, (long)30000L, (String)"waiting for initial values");
        System.out.println("no failed test");
    }

    private void createTopics() throws InterruptedException {
        this.streamTopic = "stream-" + testNo;
        this.globalTableTopic = "globalTable-" + testNo;
        CLUSTER.createTopics(this.streamTopic);
        CLUSTER.createTopic(this.globalTableTopic, 2, 1);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private void produceTopicValues(String topic) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, Arrays.asList(new KeyValue<String, Long>("a", 1L), new KeyValue<String, Long>("b", 2L), new KeyValue<String, Long>("c", 3L), new KeyValue<String, Long>("d", 4L), new KeyValue<String, Long>("e", 5L)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
    }

    private void produceInitialGlobalTableValues() throws Exception {
        this.produceInitialGlobalTableValues(false);
    }

    private void produceInitialGlobalTableValues(boolean enableTransactions) throws Exception {
        Properties properties = new Properties();
        if (enableTransactions) {
            properties.put("transactional.id", "someid");
            properties.put("retries", (Object)1);
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(this.globalTableTopic, Arrays.asList(new KeyValue<Long, String>(1L, "A"), new KeyValue<Long, String>(2L, "B"), new KeyValue<Long, String>(3L, "C"), new KeyValue<Long, String>(4L, "D")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, (Properties)properties), (Time)this.mockTime, enableTransactions);
    }

    private void produceGlobalTableValues() throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.globalTableTopic, Arrays.asList(new KeyValue<Long, String>(1L, "F"), new KeyValue<Long, String>(2L, "G"), new KeyValue<Long, String>(3L, "H"), new KeyValue<Long, String>(4L, "I"), new KeyValue<Long, String>(5L, "J")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
    }

    static {
        BROKER_CONFIG.put("transaction.state.log.replication.factor", (Object)1);
        BROKER_CONFIG.put("transaction.state.log.min.isr", (Object)1);
        CLUSTER = new EmbeddedKafkaCluster(1, BROKER_CONFIG);
        testNo = 0;
    }
}

