/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class EmitOnChangeIntegrationTest {
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static String inputTopic;
    private static String inputTopic2;
    private static String outputTopic;
    private static String outputTopic2;
    private static String appId;

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void setup(TestInfo testInfo) {
        String testId = IntegrationTestUtils.safeUniqueTestName(this.getClass(), testInfo);
        appId = "appId_" + testId;
        inputTopic = "input" + testId;
        inputTopic2 = "input2" + testId;
        outputTopic = "output" + testId;
        outputTopic2 = "output2" + testId;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic, inputTopic2, outputTopic2);
    }

    @Test
    public void shouldEmitSameRecordAfterFailover() throws Exception {
        Properties properties = Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"num.stream.threads", (Object)1), Utils.mkEntry((Object)"statestore.cache.max.bytes", (Object)0), Utils.mkEntry((Object)"commit.interval.ms", (Object)300000L), Utils.mkEntry((Object)"default.key.serde", Serdes.IntegerSerde.class), Utils.mkEntry((Object)"default.value.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)"session.timeout.ms", (Object)10000)}));
        AtomicBoolean shouldThrow = new AtomicBoolean(true);
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(inputTopic, Materialized.as((String)"test-store")).toStream().map((key, value) -> {
            if (shouldThrow.compareAndSet(true, false)) {
                throw new IllegalStateException("Kaboom");
            }
            return new KeyValue(key, value);
        }).to(outputTopic);
        builder.stream(inputTopic2).to(outputTopic2);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(inputTopic, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), 0L);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(inputTopic2, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), 0L);
            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), outputTopic, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")));
            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), outputTopic2, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")));
        }
    }

    static {
        appId = "";
    }
}

