/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class GlobalKTableJoinsTest {
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Map<String, String> results = new HashMap<String, String>();
    private final String streamTopic = "stream";
    private final String globalTopic = "global";
    private File stateDir;
    private GlobalKTable<String, String> global;
    private KStream<String, String> stream;
    private KeyValueMapper<String, String, String> keyValueMapper;
    private ForeachAction<String, String> action;
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Before
    public void setUp() {
        this.stateDir = TestUtils.tempDirectory();
        Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
        this.global = this.builder.globalTable("global", consumed);
        this.stream = this.builder.stream("stream", consumed);
        this.keyValueMapper = new KeyValueMapper<String, String, String>(){

            @Override
            public String apply(String key, String value) {
                return value;
            }
        };
        this.action = new ForeachAction<String, String>(){

            @Override
            public void apply(String key, String value) {
                GlobalKTableJoinsTest.this.results.put(key, value);
            }
        };
    }

    @Test
    public void shouldLeftJoinWithStream() {
        this.stream.leftJoin(this.global, this.keyValueMapper, MockValueJoiner.TOSTRING_JOINER).foreach(this.action);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("1", "a+A");
        expected.put("2", "b+B");
        expected.put("3", "c+null");
        this.verifyJoin(expected, "stream");
    }

    @Test
    public void shouldInnerJoinWithStream() {
        this.stream.join(this.global, this.keyValueMapper, MockValueJoiner.TOSTRING_JOINER).foreach(this.action);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("1", "a+A");
        expected.put("2", "b+B");
        this.verifyJoin(expected, "stream");
    }

    private void verifyJoin(Map<String, String> expected, String joinInput) {
        this.driver.setUp(this.builder, this.stateDir);
        this.driver.setTime(0L);
        this.driver.process("global", "a", "A");
        this.driver.process("global", "b", "B");
        this.driver.process(joinInput, "1", "a");
        this.driver.process(joinInput, "2", "b");
        this.driver.process(joinInput, "3", "c");
        this.driver.flushState();
        Assert.assertEquals(expected, this.results);
    }
}

