/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(value={IntegrationTest.class})
@RunWith(value=Parameterized.class)
public abstract class AbstractJoinIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
    static String appID;
    private static final Long COMMIT_INTERVAL;
    static final Properties STREAMS_CONFIG;
    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
    static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
    static final String OUTPUT_TOPIC = "outputTopic";
    private final long anyUniqueKey = 0L;
    private static final Properties PRODUCER_CONFIG;
    private static final Properties RESULT_CONSUMER_CONFIG;
    private KafkaProducer<Long, String> producer;
    private KafkaStreams streams;
    StreamsBuilder builder;
    int numRecordsExpected = 0;
    AtomicBoolean finalResultReached = new AtomicBoolean(false);
    private final List<Input<String>> input = Arrays.asList(new Input<String>("inputTopicLeft", null), new Input<String>("inputTopicRight", null), new Input<String>("inputTopicLeft", "A"), new Input<String>("inputTopicRight", "a"), new Input<String>("inputTopicLeft", "B"), new Input<String>("inputTopicRight", "b"), new Input<String>("inputTopicLeft", null), new Input<String>("inputTopicRight", null), new Input<String>("inputTopicLeft", "C"), new Input<String>("inputTopicRight", "c"), new Input<String>("inputTopicRight", null), new Input<String>("inputTopicLeft", null), new Input<String>("inputTopicRight", null), new Input<String>("inputTopicRight", "d"), new Input<String>("inputTopicLeft", "D"));
    final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>(){

        @Override
        public String apply(String value1, String value2) {
            return value1 + "-" + value2;
        }
    };
    final boolean cacheEnabled;

    @Parameterized.Parameters(name="caching enabled = {0}")
    public static Collection<Object[]> data() {
        ArrayList<Object[]> values = new ArrayList<Object[]>();
        for (boolean cacheEnabled : Arrays.asList(true, false)) {
            values.add(new Object[]{cacheEnabled});
        }
        return values;
    }

    AbstractJoinIntegrationTest(boolean cacheEnabled) {
        this.cacheEnabled = cacheEnabled;
    }

    @BeforeClass
    public static void setupConfigsAndUtils() {
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("retries", (Object)0);
        PRODUCER_CONFIG.put("key.serializer", LongSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
        RESULT_CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        RESULT_CONSUMER_CONFIG.put("group.id", appID + "-result-consumer");
        RESULT_CONSUMER_CONFIG.put("auto.offset.reset", "earliest");
        RESULT_CONSUMER_CONFIG.put("key.deserializer", LongDeserializer.class);
        RESULT_CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("internal.leave.group.on.close", (Object)true);
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
    }

    void prepareEnvironment() throws InterruptedException {
        CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
        if (!this.cacheEnabled) {
            STREAMS_CONFIG.put("cache.max.bytes.buffering", (Object)0);
        }
        STREAMS_CONFIG.put("state.dir", this.testFolder.getRoot().getPath());
        this.producer = new KafkaProducer(PRODUCER_CONFIG);
    }

    @After
    public void cleanup() throws InterruptedException {
        CLUSTER.deleteTopicsAndWait(120000L, INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
    }

    private void checkResult(String outputTopic, List<String> expectedResult) throws InterruptedException {
        List result2 = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30000L);
        MatcherAssert.assertThat(result2, (Matcher)Is.is(expectedResult));
    }

    private void checkResult(String outputTopic, String expectedFinalResult, int expectedTotalNumRecords) throws InterruptedException {
        List result2 = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30000L);
        MatcherAssert.assertThat(result2.get(result2.size() - 1), (Matcher)Is.is((Object)expectedFinalResult));
    }

    void runTest(List<List<String>> expectedResult) throws Exception {
        this.runTest(expectedResult, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runTest(List<List<String>> expectedResult, String storeName) throws Exception {
        assert (expectedResult.size() == this.input.size());
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
        this.streams = new KafkaStreams(this.builder.build(), new StreamsConfig(STREAMS_CONFIG));
        String expectedFinalResult = null;
        try {
            this.streams.start();
            long ts = System.currentTimeMillis();
            Iterator<List<String>> resultIterator = expectedResult.iterator();
            for (Input<String> singleInput : this.input) {
                this.producer.send(new ProducerRecord(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
                List<String> expected = resultIterator.next();
                if (expected == null) continue;
                this.checkResult(OUTPUT_TOPIC, expected);
                expectedFinalResult = expected.get(expected.size() - 1);
            }
            if (storeName != null) {
                this.checkQueryableStore(storeName, expectedFinalResult);
            }
        }
        finally {
            this.streams.close();
        }
    }

    void runTest(String expectedFinalResult) throws Exception {
        this.runTest(expectedFinalResult, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runTest(String expectedFinalResult, String storeName) throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
        this.streams = new KafkaStreams(this.builder.build(), new StreamsConfig(STREAMS_CONFIG));
        try {
            this.streams.start();
            long ts = System.currentTimeMillis();
            for (Input<String> singleInput : this.input) {
                this.producer.send(new ProducerRecord(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
            }
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return AbstractJoinIntegrationTest.this.finalResultReached.get();
                }
            }, (String)"Never received expected final result.");
            this.checkResult(OUTPUT_TOPIC, expectedFinalResult, this.numRecordsExpected);
            if (storeName != null) {
                this.checkQueryableStore(storeName, expectedFinalResult);
            }
        }
        finally {
            this.streams.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkQueryableStore(String queryableName, String expectedFinalResult) {
        ReadOnlyKeyValueStore store = this.streams.store(queryableName, QueryableStoreTypes.keyValueStore());
        KeyValueIterator all = store.all();
        KeyValue onlyEntry = (KeyValue)all.next();
        try {
            MatcherAssert.assertThat(onlyEntry.key, (Matcher)Is.is((Object)0L));
            MatcherAssert.assertThat(onlyEntry.value, (Matcher)Is.is((Object)expectedFinalResult));
            MatcherAssert.assertThat((Object)all.hasNext(), (Matcher)Is.is((Object)false));
        }
        finally {
            all.close();
        }
    }

    static {
        COMMIT_INTERVAL = 100L;
        STREAMS_CONFIG = new Properties();
        PRODUCER_CONFIG = new Properties();
        RESULT_CONSUMER_CONFIG = new Properties();
    }

    private final class Input<V> {
        String topic;
        KeyValue<Long, V> record;

        Input(String topic, V value) {
            this.topic = topic;
            this.record = KeyValue.pair(0L, value);
        }
    }
}

