/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
@Category(value={IntegrationTest.class})
public class EosIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(EosIntegrationTest.class);
    private static final int NUM_BROKERS = 3;
    private static final int MAX_POLL_INTERVAL_MS = 5000;
    private static final int MAX_WAIT_TIME_MS = 60000;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true")));
    private String applicationId;
    private static final int NUM_TOPIC_PARTITIONS = 2;
    private static final String CONSUMER_GROUP_ID = "readCommitted";
    private static final String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic";
    private static final String SINGLE_PARTITION_THROUGH_TOPIC = "singlePartitionThroughTopic";
    private static final String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
    private static final String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
    private static final String MULTI_PARTITION_THROUGH_TOPIC = "multiPartitionThroughTopic";
    private static final String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
    private final String storeName = "store";
    private AtomicBoolean errorInjected;
    private AtomicBoolean stallInjected;
    private AtomicReference<String> stallingHost;
    private volatile boolean doStall = true;
    private AtomicInteger commitRequested;
    private Throwable uncaughtException;
    private static final AtomicInteger TEST_NUMBER = new AtomicInteger(0);
    private volatile boolean hasUnexpectedError = false;
    @Parameterized.Parameter
    public String eosConfig;

    @Parameterized.Parameters(name="{0}")
    public static Collection<String[]> data() {
        return Arrays.asList({"exactly_once"}, {"exactly_once_beta"});
    }

    @Before
    public void createTopics() throws Exception {
        this.applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
        CLUSTER.deleteTopicsAndWait(SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopics(SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, 2, 1);
        CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, 2, 1);
        CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, 2, 1);
    }

    @Test
    public void shouldBeAbleToRunWithEosEnabled() throws Exception {
        this.runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception {
        this.runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true, this.eosConfig);
        try (Admin adminClient = Admin.create((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
             KafkaConsumer consumer = new KafkaConsumer(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"group.id", (Object)this.applicationId), Utils.mkEntry((Object)"key.deserializer", ByteArrayDeserializer.class), Utils.mkEntry((Object)"value.deserializer", ByteArrayDeserializer.class)}));){
            IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, this.applicationId, 25000L);
            TopicPartition topicPartition = new TopicPartition(SINGLE_PARTITION_INPUT_TOPIC, 0);
            Set<TopicPartition> topicPartitions = Collections.singleton(topicPartition);
            long committedOffset = ((OffsetAndMetadata)((Map)adminClient.listConsumerGroupOffsets(this.applicationId).partitionsToOffsetAndMetadata().get()).get(topicPartition)).offset();
            consumer.assign(topicPartitions);
            long consumerPosition = consumer.position(topicPartition);
            long endOffset = (Long)consumer.endOffsets(topicPartitions).get(topicPartition);
            MatcherAssert.assertThat((Object)committedOffset, (Matcher)CoreMatchers.equalTo((Object)consumerPosition));
            MatcherAssert.assertThat((Object)committedOffset, (Matcher)CoreMatchers.equalTo((Object)endOffset));
        }
    }

    @Test
    public void shouldBeAbleToRestartAfterClose() throws Exception {
        this.runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
        this.runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
        this.runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
        this.runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    @Test
    public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
        this.runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, this.eosConfig);
    }

    private void runSimpleCopyTest(int numberOfRestarts, String inputTopic, String throughTopic, String outputTopic, boolean inputTopicTransactional, String eosConfig) throws Exception {
        KStream input;
        StreamsBuilder builder = new StreamsBuilder();
        KStream output = input = builder.stream(inputTopic);
        if (throughTopic != null) {
            input.to(throughTopic);
            output = builder.stream(throughTopic);
        }
        output.to(outputTopic);
        Properties properties = new Properties();
        properties.put("processing.guarantee", eosConfig);
        properties.put("cache.max.bytes.buffering", (Object)0);
        properties.put("commit.interval.ms", (Object)100);
        properties.put(StreamsConfig.consumerPrefix((String)"max.poll.records"), (Object)1);
        properties.put(StreamsConfig.consumerPrefix((String)"metadata.max.age.ms"), "1000");
        properties.put(StreamsConfig.consumerPrefix((String)"auto.offset.reset"), "earliest");
        for (int i = 0; i < numberOfRestarts; ++i) {
            Properties config = StreamsTestUtils.getStreamsConfig(this.applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties);
            try (KafkaStreams streams = new KafkaStreams(builder.build(), config);){
                StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(streams, 60000L);
                List<KeyValue<Long, Long>> inputData = this.prepareData(i * 100, (long)(i * 100) + 10L, 0L, 1L);
                Properties producerConfigs = new Properties();
                if (inputTopicTransactional) {
                    producerConfigs.setProperty("transactional.id", this.applicationId + "-input-producer");
                }
                IntegrationTestUtils.produceKeyValuesSynchronously(inputTopic, inputData, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, (Properties)producerConfigs), (Time)EosIntegrationTest.CLUSTER.time, inputTopicTransactional);
                List<KeyValue<Long, Long>> committedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, (Properties)Utils.mkProperties(Collections.singletonMap("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))), outputTopic, inputData.size());
                this.checkResultPerKey(committedRecords, inputData, "The committed records do not match what expected");
                continue;
            }
        }
    }

    private void checkResultPerKey(List<KeyValue<Long, Long>> result, List<KeyValue<Long, Long>> expectedResult, String reason) {
        HashSet<Long> allKeys = new HashSet<Long>();
        this.addAllKeys(allKeys, result);
        this.addAllKeys(allKeys, expectedResult);
        for (Long key : allKeys) {
            MatcherAssert.assertThat((String)reason, this.getAllRecordPerKey(key, result), (Matcher)CoreMatchers.equalTo(this.getAllRecordPerKey(key, expectedResult)));
        }
    }

    private void addAllKeys(Set<Long> allKeys, List<KeyValue<Long, Long>> records) {
        for (KeyValue<Long, Long> record : records) {
            allKeys.add((Long)record.key);
        }
    }

    private List<KeyValue<Long, Long>> getAllRecordPerKey(Long key, List<KeyValue<Long, Long>> records) {
        ArrayList<KeyValue<Long, Long>> recordsPerKey = new ArrayList<KeyValue<Long, Long>>(records.size());
        for (KeyValue<Long, Long> record : records) {
            if (!((Long)record.key).equals(key)) continue;
            recordsPerKey.add(record);
        }
        return recordsPerKey;
    }

    @Test
    public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        Properties properties = new Properties();
        properties.put("processing.guarantee", this.eosConfig);
        properties.put("cache.max.bytes.buffering", (Object)0);
        properties.put("commit.interval.ms", (Object)100);
        properties.put("metadata.max.age.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        Properties config = StreamsTestUtils.getStreamsConfig(this.applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties);
        try (KafkaStreams streams = new KafkaStreams(builder.build(), config);){
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(streams, 60000L);
            List firstBurstOfData = this.prepareData(0L, 5L, 0L);
            List secondBurstOfData = this.prepareData(5L, 8L, 0L);
            IntegrationTestUtils.produceKeyValuesSynchronously(SINGLE_PARTITION_INPUT_TOPIC, firstBurstOfData, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), (Time)EosIntegrationTest.CLUSTER.time);
            List firstCommittedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, (Properties)Utils.mkProperties(Collections.singletonMap("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))), SINGLE_PARTITION_OUTPUT_TOPIC, firstBurstOfData.size());
            MatcherAssert.assertThat(firstCommittedRecords, (Matcher)CoreMatchers.equalTo(firstBurstOfData));
            IntegrationTestUtils.produceKeyValuesSynchronously(SINGLE_PARTITION_INPUT_TOPIC, secondBurstOfData, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), (Time)EosIntegrationTest.CLUSTER.time);
            List secondCommittedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, (Properties)Utils.mkProperties(Collections.singletonMap("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))), SINGLE_PARTITION_OUTPUT_TOPIC, secondBurstOfData.size());
            MatcherAssert.assertThat(secondCommittedRecords, (Matcher)CoreMatchers.equalTo(secondBurstOfData));
        }
    }

    @Test
    public void shouldNotViolateEosIfOneTaskFails() throws Exception {
        try (KafkaStreams streams = this.getKafkaStreams("dummy", false, "appDir", 2, this.eosConfig, 5000);){
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(streams, 60000L);
            List<KeyValue<Long, Long>> committedDataBeforeFailure = this.prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = this.prepareData(10L, 15L, 0L, 1L);
            ArrayList<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<KeyValue<Long, Long>>(committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size());
            dataBeforeFailure.addAll(committedDataBeforeFailure);
            dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
            List<KeyValue<Long, Long>> dataAfterFailure = this.prepareData(15L, 20L, 0L, 1L);
            this.writeInputData(committedDataBeforeFailure);
            TestUtils.waitForCondition(() -> this.commitRequested.get() == 2, (long)60000L, (String)"StreamsTasks did not request commit.");
            List<KeyValue<Long, Long>> committedRecords = this.readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
            this.checkResultPerKey(committedRecords, committedDataBeforeFailure, "The committed records before failure do not match what expected");
            this.writeInputData(uncommittedDataBeforeFailure);
            List<KeyValue<Long, Long>> uncommittedRecords = this.readResult(dataBeforeFailure.size(), null);
            this.checkResultPerKey(uncommittedRecords, dataBeforeFailure, "The uncommitted records before failure do not match what expected");
            this.errorInjected.set(true);
            this.writeInputData(dataAfterFailure);
            TestUtils.waitForCondition(() -> this.uncaughtException != null, (long)60000L, (String)"Should receive uncaught exception from one StreamThread.");
            List<KeyValue<Long, Long>> allCommittedRecords = this.readResult(committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), "readCommitted_ALL");
            List<KeyValue<Long, Long>> committedRecordsAfterFailure = this.readResult(uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), CONSUMER_GROUP_ID);
            int allCommittedRecordsAfterRecoverySize = committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
            ArrayList<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<KeyValue<Long, Long>>(allCommittedRecordsAfterRecoverySize);
            allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
            allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
            allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
            int committedRecordsAfterRecoverySize = uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
            ArrayList<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<KeyValue<Long, Long>>(committedRecordsAfterRecoverySize);
            expectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
            expectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
            this.checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery, "The all committed records after recovery do not match what expected");
            this.checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery, "The committed records after recovery do not match what expected");
            MatcherAssert.assertThat((String)"Should only get one uncaught exception from Streams.", (Object)this.hasUnexpectedError, (Matcher)CoreMatchers.is((Object)false));
        }
    }

    @Test
    public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
        try (KafkaStreams streams = this.getKafkaStreams("dummy", true, "appDir", 2, this.eosConfig, 15000);){
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(streams, 60000L);
            List<KeyValue<Long, Long>> committedDataBeforeFailure = this.prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = this.prepareData(10L, 15L, 0L, 1L, 2L, 3L);
            ArrayList<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<KeyValue<Long, Long>>(committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size());
            dataBeforeFailure.addAll(committedDataBeforeFailure);
            dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
            List<KeyValue<Long, Long>> dataAfterFailure = this.prepareData(15L, 20L, 0L, 1L);
            this.writeInputData(committedDataBeforeFailure);
            TestUtils.waitForCondition(() -> this.commitRequested.get() == 2, (long)60000L, (String)"SteamsTasks did not request commit.");
            List<KeyValue<Long, Long>> committedRecords = this.readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
            this.checkResultPerKey(committedRecords, this.computeExpectedResult(committedDataBeforeFailure), "The committed records before failure do not match what expected");
            this.writeInputData(uncommittedDataBeforeFailure);
            List<KeyValue<Long, Long>> uncommittedRecords = this.readResult(dataBeforeFailure.size(), null);
            List<KeyValue<Long, Long>> expectedResultBeforeFailure = this.computeExpectedResult(dataBeforeFailure);
            this.checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure, "The uncommitted records before failure do not match what expected");
            this.verifyStateStore(streams, this.getMaxPerKey(expectedResultBeforeFailure), "The state store content before failure do not match what expected");
            this.errorInjected.set(true);
            this.writeInputData(dataAfterFailure);
            TestUtils.waitForCondition(() -> this.uncaughtException != null, (long)60000L, (String)"Should receive uncaught exception from one StreamThread.");
            List<KeyValue<Long, Long>> allCommittedRecords = this.readResult(committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), "readCommitted_ALL");
            List<KeyValue<Long, Long>> committedRecordsAfterFailure = this.readResult(uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), CONSUMER_GROUP_ID);
            int allCommittedRecordsAfterRecoverySize = committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
            ArrayList<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<KeyValue<Long, Long>>(allCommittedRecordsAfterRecoverySize);
            allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
            allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
            allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
            List<KeyValue<Long, Long>> expectedResult = this.computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
            this.checkResultPerKey(allCommittedRecords, expectedResult, "The all committed records after recovery do not match what expected");
            this.checkResultPerKey(committedRecordsAfterFailure, expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()), "The committed records after recovery do not match what expected");
            this.verifyStateStore(streams, this.getMaxPerKey(expectedResult), "The state store content after recovery do not match what expected");
            MatcherAssert.assertThat((String)"Should only get one uncaught exception from Streams.", (Object)this.hasUnexpectedError, (Matcher)CoreMatchers.is((Object)false));
        }
    }

    @Test
    public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
        try (KafkaStreams streams1 = this.getKafkaStreams("streams1", false, "appDir1", 1, this.eosConfig, 5000);
             KafkaStreams streams2 = this.getKafkaStreams("streams2", false, "appDir2", 1, this.eosConfig, 5000);){
            KafkaStreams remainingInstance;
            KafkaStreams stallingInstance;
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(streams1, 60000L);
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(streams2, 60000L);
            List<KeyValue<Long, Long>> committedDataBeforeStall = this.prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> uncommittedDataBeforeStall = this.prepareData(10L, 15L, 0L, 1L);
            ArrayList<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<KeyValue<Long, Long>>(committedDataBeforeStall.size() + uncommittedDataBeforeStall.size());
            dataBeforeStall.addAll(committedDataBeforeStall);
            dataBeforeStall.addAll(uncommittedDataBeforeStall);
            List<KeyValue<Long, Long>> dataToTriggerFirstRebalance = this.prepareData(15L, 20L, 0L, 1L);
            List<KeyValue<Long, Long>> dataAfterSecondRebalance = this.prepareData(20L, 30L, 0L, 1L);
            this.writeInputData(committedDataBeforeStall);
            TestUtils.waitForCondition(() -> this.commitRequested.get() == 2, (long)60000L, (String)"SteamsTasks did not request commit.");
            List<KeyValue<Long, Long>> committedRecords = this.readResult(committedDataBeforeStall.size(), CONSUMER_GROUP_ID);
            this.checkResultPerKey(committedRecords, committedDataBeforeStall, "The committed records before stall do not match what expected");
            this.writeInputData(uncommittedDataBeforeStall);
            List<KeyValue<Long, Long>> uncommittedRecords = this.readResult(dataBeforeStall.size(), null);
            this.checkResultPerKey(uncommittedRecords, dataBeforeStall, "The uncommitted records before stall do not match what expected");
            LOG.info("Injecting Stall");
            this.stallInjected.set(true);
            this.writeInputData(dataToTriggerFirstRebalance);
            LOG.info("Input Data Written");
            TestUtils.waitForCondition(() -> this.stallingHost.get() != null, (long)60000L, (String)"Expected a host to start stalling");
            String observedStallingHost = this.stallingHost.get();
            if ("streams1".equals(observedStallingHost)) {
                stallingInstance = streams1;
                remainingInstance = streams2;
            } else if ("streams2".equals(observedStallingHost)) {
                stallingInstance = streams2;
                remainingInstance = streams1;
            } else {
                throw new IllegalArgumentException("unexpected host name: " + observedStallingHost);
            }
            TestUtils.waitForCondition(() -> stallingInstance.allMetadata().size() == 2 && remainingInstance.allMetadata().size() == 1 && ((StreamsMetadata)remainingInstance.allMetadata().iterator().next()).topicPartitions().size() == 2, (long)60000L, () -> "Should have rebalanced.\nStreams1[" + streams1.allMetadata() + "]\nStreams2[" + streams2.allMetadata() + "]");
            List<KeyValue<Long, Long>> committedRecordsAfterRebalance = this.readResult(uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size(), CONSUMER_GROUP_ID);
            ArrayList<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<KeyValue<Long, Long>>(uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size());
            expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeStall);
            expectedCommittedRecordsAfterRebalance.addAll(dataToTriggerFirstRebalance);
            this.checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance, "The all committed records after rebalance do not match what expected");
            LOG.info("Releasing Stall");
            this.doStall = false;
            TestUtils.waitForCondition(() -> streams1.allMetadata().size() == 2 && streams2.allMetadata().size() == 2 && streams1.allMetadata().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2L && streams2.allMetadata().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2L, (long)60000L, () -> "Should have rebalanced.\nStreams1[" + streams1.allMetadata() + "]\nStreams2[" + streams2.allMetadata() + "]");
            this.writeInputData(dataAfterSecondRebalance);
            List<KeyValue<Long, Long>> allCommittedRecords = this.readResult(committedDataBeforeStall.size() + uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size(), "readCommitted_ALL");
            int allCommittedRecordsAfterRecoverySize = committedDataBeforeStall.size() + uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size();
            ArrayList<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<KeyValue<Long, Long>>(allCommittedRecordsAfterRecoverySize);
            allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeStall);
            allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeStall);
            allExpectedCommittedRecordsAfterRecovery.addAll(dataToTriggerFirstRebalance);
            allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance);
            this.checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery, "The all committed records after recovery do not match what expected");
        }
    }

    private List<KeyValue<Long, Long>> prepareData(long fromInclusive, long toExclusive, Long ... keys) {
        Long dataSize = (long)keys.length * (toExclusive - fromInclusive);
        ArrayList<KeyValue<Long, Long>> data = new ArrayList<KeyValue<Long, Long>>(dataSize.intValue());
        for (Long k : keys) {
            for (long v = fromInclusive; v < toExclusive; ++v) {
                data.add((KeyValue<Long, Long>)new KeyValue((Object)k, (Object)v));
            }
        }
        return data;
    }

    private KafkaStreams getKafkaStreams(final String dummyHostName, final boolean withState, String appDir, int numberOfStreamsThreads, String eosConfig, int maxPollIntervalMs) {
        this.commitRequested = new AtomicInteger(0);
        this.errorInjected = new AtomicBoolean(false);
        this.stallInjected = new AtomicBoolean(false);
        this.stallingHost = new AtomicReference();
        StreamsBuilder builder = new StreamsBuilder();
        String[] storeNames = new String[]{};
        if (withState) {
            storeNames = new String[]{"store"};
            StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"), (Serde)Serdes.Long(), (Serde)Serdes.Long()).withCachingEnabled();
            builder.addStateStore(storeBuilder);
        }
        KStream input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
        input.transform((TransformerSupplier)new TransformerSupplier<Long, Long, KeyValue<Long, Long>>(){

            public Transformer<Long, Long, KeyValue<Long, Long>> get() {
                return new Transformer<Long, Long, KeyValue<Long, Long>>(){
                    ProcessorContext context;
                    KeyValueStore<Long, Long> state = null;

                    public void init(ProcessorContext context) {
                        this.context = context;
                        if (withState) {
                            this.state = (KeyValueStore)context.getStateStore("store");
                        }
                    }

                    public KeyValue<Long, Long> transform(Long key, Long value) {
                        if (EosIntegrationTest.this.stallInjected.compareAndSet(true, false)) {
                            LOG.info(dummyHostName + " is executing the injected stall");
                            EosIntegrationTest.this.stallingHost.set(dummyHostName);
                            while (EosIntegrationTest.this.doStall) {
                                StreamThread thread = (StreamThread)Thread.currentThread();
                                if (thread.isInterrupted() || !thread.isRunning()) {
                                    throw new RuntimeException("Detected we've been interrupted.");
                                }
                                try {
                                    Thread.sleep(100L);
                                }
                                catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        }
                        if ((value + 1L) % 10L == 0L) {
                            this.context.commit();
                            EosIntegrationTest.this.commitRequested.incrementAndGet();
                        }
                        if (this.state != null) {
                            Long sum = (Long)this.state.get((Object)key);
                            sum = sum == null ? value : Long.valueOf(sum + value);
                            this.state.put((Object)key, (Object)sum);
                            this.state.flush();
                        }
                        if (EosIntegrationTest.this.errorInjected.compareAndSet(true, false)) {
                            throw new RuntimeException("Injected test exception.");
                        }
                        if (this.state != null) {
                            return new KeyValue((Object)key, this.state.get((Object)key));
                        }
                        return new KeyValue((Object)key, (Object)value);
                    }

                    public void close() {
                    }
                };
            }
        }, storeNames).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        Properties properties = new Properties();
        properties.put("processing.guarantee", eosConfig);
        properties.put("num.stream.threads", (Object)numberOfStreamsThreads);
        properties.put("commit.interval.ms", (Object)Long.MAX_VALUE);
        properties.put(StreamsConfig.consumerPrefix((String)"metadata.max.age.ms"), "1000");
        properties.put(StreamsConfig.consumerPrefix((String)"auto.offset.reset"), "earliest");
        properties.put(StreamsConfig.consumerPrefix((String)"request.timeout.ms"), (Object)5000);
        properties.put(StreamsConfig.consumerPrefix((String)"session.timeout.ms"), (Object)4999);
        properties.put(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms"), (Object)maxPollIntervalMs);
        properties.put("cache.max.bytes.buffering", (Object)0);
        properties.put("state.dir", TestUtils.tempDirectory().getPath() + File.separator + appDir);
        properties.put("application.server", dummyHostName + ":2142");
        Properties config = StreamsTestUtils.getStreamsConfig(this.applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties);
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.setUncaughtExceptionHandler((t, e) -> {
            if (this.uncaughtException != null || !e.getMessage().contains("Injected test exception")) {
                e.printStackTrace(System.err);
                this.hasUnexpectedError = true;
            }
            this.uncaughtException = e;
        });
        return streams;
    }

    private void writeInputData(List<KeyValue<Long, Long>> records) {
        IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, records, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), (Time)EosIntegrationTest.CLUSTER.time);
    }

    private List<KeyValue<Long, Long>> readResult(int numberOfRecords, String groupId) throws Exception {
        if (groupId != null) {
            return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)groupId, LongDeserializer.class, LongDeserializer.class, (Properties)Utils.mkProperties(Collections.singletonMap("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))), SINGLE_PARTITION_OUTPUT_TOPIC, numberOfRecords);
        }
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), SINGLE_PARTITION_OUTPUT_TOPIC, numberOfRecords);
    }

    private List<KeyValue<Long, Long>> computeExpectedResult(List<KeyValue<Long, Long>> input) {
        ArrayList<KeyValue<Long, Long>> expectedResult = new ArrayList<KeyValue<Long, Long>>(input.size());
        HashMap<Object, Long> sums = new HashMap<Object, Long>();
        for (KeyValue<Long, Long> record : input) {
            Long sum = (Long)sums.get(record.key);
            sum = sum == null ? (Long)record.value : Long.valueOf(sum + (Long)record.value);
            sums.put(record.key, sum);
            expectedResult.add((KeyValue<Long, Long>)new KeyValue(record.key, (Object)sum));
        }
        return expectedResult;
    }

    private Set<KeyValue<Long, Long>> getMaxPerKey(List<KeyValue<Long, Long>> input) {
        HashSet<KeyValue<Long, Long>> expectedResult = new HashSet<KeyValue<Long, Long>>(input.size());
        HashMap<Object, Object> maxPerKey = new HashMap<Object, Object>();
        for (KeyValue<Long, Long> keyValue : input) {
            Long max = (Long)maxPerKey.get(keyValue.key);
            if (max != null && (Long)keyValue.value <= max) continue;
            maxPerKey.put(keyValue.key, keyValue.value);
        }
        for (Map.Entry entry : maxPerKey.entrySet()) {
            expectedResult.add((KeyValue<Long, Long>)new KeyValue(entry.getKey(), entry.getValue()));
        }
        return expectedResult;
    }

    private void verifyStateStore(KafkaStreams streams, Set<KeyValue<Long, Long>> expectedStoreContent, String reason) throws Exception {
        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(300000L, "store", streams, QueryableStoreTypes.keyValueStore());
        Assert.assertNotNull((Object)store);
        KeyValueIterator it = store.all();
        while (it.hasNext()) {
            Assert.assertTrue((String)reason, (boolean)expectedStoreContent.remove(it.next()));
        }
        Assert.assertTrue((String)reason, (boolean)expectedStoreContent.isEmpty());
    }
}

