/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="integration")
@Timeout(value=600L)
public class EosV2UpgradeIntegrationTest {
    private static final int NUM_BROKERS = 3;
    private static final int MAX_POLL_INTERVAL_MS = (int)Duration.ofSeconds(100L).toMillis();
    private static final long MAX_WAIT_TIME_MS = Duration.ofMinutes(1L).toMillis();
    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CLOSE = Collections.unmodifiableList(Arrays.asList(KeyValue.pair((Object)KafkaStreams.State.RUNNING, (Object)KafkaStreams.State.PENDING_SHUTDOWN), KeyValue.pair((Object)KafkaStreams.State.PENDING_SHUTDOWN, (Object)KafkaStreams.State.NOT_RUNNING)));
    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CRASH = Collections.singletonList(KeyValue.pair((Object)KafkaStreams.State.PENDING_ERROR, (Object)KafkaStreams.State.ERROR));
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false")));
    private static String applicationId;
    private static final int NUM_TOPIC_PARTITIONS = 4;
    private static final String CONSUMER_GROUP_ID = "readCommitted";
    private static final String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
    private static final String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
    private static final String APP_DIR_1 = "appDir1";
    private static final String APP_DIR_2 = "appDir2";
    private static final String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception, or there are too many exceptions thrown, please check standard error log for more info.";
    private final String storeName = "store";
    private final IntegrationTestUtils.StableAssignmentListener assignmentListener = new IntegrationTestUtils.StableAssignmentListener();
    private final AtomicBoolean errorInjectedClient1 = new AtomicBoolean(false);
    private final AtomicBoolean errorInjectedClient2 = new AtomicBoolean(false);
    private final AtomicBoolean commitErrorInjectedClient1 = new AtomicBoolean(false);
    private final AtomicBoolean commitErrorInjectedClient2 = new AtomicBoolean(false);
    private final AtomicInteger commitCounterClient1 = new AtomicInteger(-1);
    private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
    private final AtomicInteger commitRequested = new AtomicInteger(0);
    private int testNumber = 0;
    private final Map<String, Integer> exceptionCounts = new HashMap<String, Integer>(){
        {
            this.put(EosV2UpgradeIntegrationTest.APP_DIR_1, 0);
            this.put(EosV2UpgradeIntegrationTest.APP_DIR_2, 0);
        }
    };
    private volatile boolean hasUnexpectedError = false;

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void createTopics() throws Exception {
        applicationId = "appId-" + ++this.testNumber;
        CLUSTER.deleteTopicsAndWait(MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, applicationId + "-" + "store" + "-changelog");
        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, 4, 1);
        CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, 4, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldUpgradeFromEosAlphaToEosV2(boolean injectError) throws Exception {
        LinkedList<KeyValue<KafkaStreams.State, KafkaStreams.State>> stateTransitions1 = new LinkedList<KeyValue<KafkaStreams.State, KafkaStreams.State>>();
        KafkaStreams streams1Alpha = null;
        KafkaStreams streams1V2 = null;
        KafkaStreams streams1V2Two = null;
        LinkedList<KeyValue<KafkaStreams.State, KafkaStreams.State>> stateTransitions2 = new LinkedList<KeyValue<KafkaStreams.State, KafkaStreams.State>>();
        KafkaStreams streams2Alpha = null;
        KafkaStreams streams2AlphaTwo = null;
        KafkaStreams streams2V2 = null;
        try {
            List<KeyValue<Long, Long>> expectedCommittedResultAfterFailure;
            List<KeyValue<Long, Long>> dataFailingKey;
            Long failingKey;
            Object otherKey;
            Iterator<Long> it;
            List<KeyValue<Long, Long>> expectedCommittedResultBeforeFailure;
            Set<Long> keysFirstClientV2;
            Set newlyCommittedKeys;
            streams1Alpha = this.getKafkaStreams(APP_DIR_1, "exactly_once", injectError);
            streams1Alpha.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair((Object)oldState, (Object)newState)));
            this.assignmentListener.prepareForRebalance();
            streams1Alpha.cleanUp();
            streams1Alpha.start();
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            this.waitForRunning(stateTransitions1);
            streams2Alpha = this.getKafkaStreams(APP_DIR_2, "exactly_once", injectError);
            streams2Alpha.setStateListener((newState, oldState) -> stateTransitions2.add(KeyValue.pair((Object)oldState, (Object)newState)));
            stateTransitions1.clear();
            this.assignmentListener.prepareForRebalance();
            streams2Alpha.cleanUp();
            streams2Alpha.start();
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            this.waitForRunning(stateTransitions1);
            this.waitForRunning(stateTransitions2);
            List<KeyValue<Long, Long>> committedInputDataBeforeUpgrade = this.prepareData(0L, 10L, 0L, 1L, 2L, 3L);
            this.writeInputData(committedInputDataBeforeUpgrade);
            TestUtils.waitForCondition(() -> this.commitRequested.get() == 4, (long)MAX_WAIT_TIME_MS, (String)"SteamsTasks did not request commit.");
            HashMap<Long, Long> committedState = new HashMap<Long, Long>();
            List<KeyValue<Long, Long>> expectedUncommittedResult = this.computeExpectedResult(committedInputDataBeforeUpgrade, committedState);
            this.verifyCommitted(expectedUncommittedResult);
            Set cleanKeys = Utils.mkSet((Object[])new Long[]{0L, 1L, 2L, 3L});
            Set<Long> keysFirstClientAlpha = this.keysFromInstance(streams1Alpha);
            long firstFailingKeyForCrashCase = keysFirstClientAlpha.iterator().next();
            cleanKeys.remove(firstFailingKeyForCrashCase);
            LinkedList<KeyValue<Long, Long>> uncommittedInputDataBeforeFirstUpgrade = new LinkedList<KeyValue<Long, Long>>();
            HashMap<Long, Long> uncommittedState = new HashMap<Long, Long>(committedState);
            if (!injectError) {
                uncommittedInputDataBeforeFirstUpgrade.addAll(this.prepareData(10L, 15L, 0L, 1L, 2L, 3L));
                this.writeInputData(uncommittedInputDataBeforeFirstUpgrade);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade, uncommittedState));
                this.verifyUncommitted(expectedUncommittedResult);
            } else {
                LinkedList<KeyValue<Long, Long>> uncommittedInputDataWithoutFailingKey = new LinkedList<KeyValue<Long, Long>>();
                Iterator iterator = cleanKeys.iterator();
                while (iterator.hasNext()) {
                    long key = (Long)iterator.next();
                    uncommittedInputDataWithoutFailingKey.addAll(this.prepareData(10L, 15L, key));
                }
                uncommittedInputDataWithoutFailingKey.addAll(this.prepareData(10L, 14L, firstFailingKeyForCrashCase));
                uncommittedInputDataBeforeFirstUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
                this.writeInputData(uncommittedInputDataWithoutFailingKey);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataWithoutFailingKey, new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
            }
            stateTransitions2.clear();
            this.assignmentListener.prepareForRebalance();
            if (!injectError) {
                stateTransitions1.clear();
                streams1Alpha.close();
                this.waitForStateTransition(stateTransitions1, CLOSE);
            } else {
                this.errorInjectedClient1.set(true);
                List<KeyValue<Long, Long>> dataPotentiallyFirstFailingKey = this.prepareData(14L, 15L, firstFailingKeyForCrashCase);
                uncommittedInputDataBeforeFirstUpgrade.addAll(dataPotentiallyFirstFailingKey);
                this.writeInputData(dataPotentiallyFirstFailingKey);
            }
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            this.waitForRunning(stateTransitions2);
            if (!injectError) {
                List<KeyValue<Long, Long>> committedInputDataDuringFirstUpgrade = uncommittedInputDataBeforeFirstUpgrade.stream().filter(pair -> keysFirstClientAlpha.contains(pair.key)).collect(Collectors.toList());
                List<KeyValue<Long, Long>> expectedCommittedResult = this.computeExpectedResult(committedInputDataDuringFirstUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResult);
            } else {
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade.stream().filter(pair -> keysFirstClientAlpha.contains(pair.key)).collect(Collectors.toList()), new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
                this.waitForStateTransitionContains(stateTransitions1, CRASH);
                this.errorInjectedClient1.set(false);
                stateTransitions1.clear();
                streams1Alpha.close();
                Assertions.assertFalse((boolean)this.hasUnexpectedError, (String)UNEXPECTED_EXCEPTION_MSG);
            }
            this.commitRequested.set(0);
            stateTransitions1.clear();
            stateTransitions2.clear();
            streams1V2 = this.getKafkaStreams(APP_DIR_1, "exactly_once_v2", injectError);
            streams1V2.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair((Object)oldState, (Object)newState)));
            this.assignmentListener.prepareForRebalance();
            streams1V2.start();
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            this.waitForRunning(stateTransitions1);
            this.waitForRunning(stateTransitions2);
            if (!injectError) {
                newlyCommittedKeys = this.keysFromInstance(streams1V2);
                newlyCommittedKeys.removeAll(keysFirstClientAlpha);
            } else {
                newlyCommittedKeys = Utils.mkSet((Object[])new Long[]{0L, 1L, 2L, 3L});
            }
            List<KeyValue<Long, Long>> expectedCommittedResultAfterRestartFirstClient = this.computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade.stream().filter(pair -> newlyCommittedKeys.contains(pair.key)).collect(Collectors.toList()), committedState);
            this.verifyCommitted(expectedCommittedResultAfterRestartFirstClient);
            this.commitCounterClient1.set(0);
            if (!injectError) {
                List<KeyValue<Long, Long>> finishSecondBatch = this.prepareData(15L, 20L, 0L, 1L, 2L, 3L);
                this.writeInputData(finishSecondBatch);
                List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade.stream().filter(pair -> !keysFirstClientAlpha.contains(pair.key)).filter(pair -> !newlyCommittedKeys.contains(pair.key)).collect(Collectors.toList());
                committedInputDataDuringUpgrade.addAll(finishSecondBatch);
                expectedUncommittedResult.addAll(this.computeExpectedResult(finishSecondBatch, uncommittedState));
                List<KeyValue<Long, Long>> expectedCommittedResult = this.computeExpectedResult(committedInputDataDuringUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResult);
            } else {
                keysFirstClientV2 = this.keysFromInstance(streams1V2);
                Set<Long> keysSecondClientAlpha = this.keysFromInstance(streams2Alpha);
                List<KeyValue<Long, Long>> committedInputDataAfterFirstUpgrade = this.prepareData(15L, 20L, keysFirstClientV2.toArray(new Long[0]));
                this.writeInputData(committedInputDataAfterFirstUpgrade);
                expectedCommittedResultBeforeFailure = this.computeExpectedResult(committedInputDataAfterFirstUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResultBeforeFailure);
                expectedUncommittedResult.addAll(expectedCommittedResultBeforeFailure);
                this.commitCounterClient2.set(0);
                it = keysSecondClientAlpha.iterator();
                otherKey = it.next();
                failingKey = it.next();
                List<KeyValue<Long, Long>> uncommittedInputDataAfterFirstUpgrade = this.prepareData(15L, 19L, keysSecondClientAlpha.toArray(new Long[0]));
                uncommittedInputDataAfterFirstUpgrade.addAll(this.prepareData(19L, 20L, new Long[]{otherKey}));
                this.writeInputData(uncommittedInputDataAfterFirstUpgrade);
                uncommittedState.putAll(committedState);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, uncommittedState));
                this.verifyUncommitted(expectedUncommittedResult);
                stateTransitions1.clear();
                stateTransitions2.clear();
                this.assignmentListener.prepareForRebalance();
                this.commitCounterClient1.set(0);
                this.commitErrorInjectedClient2.set(true);
                dataFailingKey = this.prepareData(19L, 20L, failingKey);
                uncommittedInputDataAfterFirstUpgrade.addAll(dataFailingKey);
                this.writeInputData(dataFailingKey);
                expectedUncommittedResult.addAll(this.computeExpectedResult(dataFailingKey, uncommittedState));
                this.verifyUncommitted(expectedUncommittedResult);
                this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
                this.waitForStateTransitionContains(stateTransitions2, CRASH);
                this.commitErrorInjectedClient2.set(false);
                stateTransitions2.clear();
                streams2Alpha.close();
                Assertions.assertFalse((boolean)this.hasUnexpectedError, (String)UNEXPECTED_EXCEPTION_MSG);
                expectedCommittedResultAfterFailure = this.computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResultAfterFailure);
                expectedUncommittedResult.addAll(expectedCommittedResultAfterFailure);
            }
            if (!injectError) {
                streams2AlphaTwo = streams2Alpha;
            } else {
                this.commitCounterClient1.set(0);
                this.commitCounterClient2.set(-1);
                stateTransitions1.clear();
                stateTransitions2.clear();
                streams2AlphaTwo = this.getKafkaStreams(APP_DIR_2, "exactly_once", true);
                streams2AlphaTwo.setStateListener((newState, oldState) -> stateTransitions2.add(KeyValue.pair((Object)oldState, (Object)newState)));
                this.assignmentListener.prepareForRebalance();
                streams2AlphaTwo.start();
                this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
                this.waitForRunning(stateTransitions1);
                this.waitForRunning(stateTransitions2);
                keysFirstClientV2 = this.keysFromInstance(streams1V2);
                Set<Long> keysSecondClientAlphaTwo = this.keysFromInstance(streams2AlphaTwo);
                List<KeyValue<Long, Long>> committedInputDataBetweenUpgrades = this.prepareData(20L, 30L, keysSecondClientAlphaTwo.toArray(new Long[0]));
                this.writeInputData(committedInputDataBetweenUpgrades);
                expectedCommittedResultBeforeFailure = this.computeExpectedResult(committedInputDataBetweenUpgrades, committedState);
                this.verifyCommitted(expectedCommittedResultBeforeFailure);
                expectedUncommittedResult.addAll(expectedCommittedResultBeforeFailure);
                this.commitCounterClient2.set(0);
                it = keysFirstClientV2.iterator();
                otherKey = it.next();
                failingKey = it.next();
                List<KeyValue<Long, Long>> uncommittedInputDataBetweenUpgrade = this.prepareData(20L, 29L, keysFirstClientV2.toArray(new Long[0]));
                uncommittedInputDataBetweenUpgrade.addAll(this.prepareData(29L, 30L, new Long[]{otherKey}));
                this.writeInputData(uncommittedInputDataBetweenUpgrade);
                uncommittedState.putAll(committedState);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBetweenUpgrade, uncommittedState));
                this.verifyUncommitted(expectedUncommittedResult);
                stateTransitions1.clear();
                stateTransitions2.clear();
                this.assignmentListener.prepareForRebalance();
                this.commitCounterClient2.set(0);
                this.commitErrorInjectedClient1.set(true);
                dataFailingKey = this.prepareData(29L, 30L, failingKey);
                uncommittedInputDataBetweenUpgrade.addAll(dataFailingKey);
                this.writeInputData(dataFailingKey);
                expectedUncommittedResult.addAll(this.computeExpectedResult(dataFailingKey, uncommittedState));
                this.verifyUncommitted(expectedUncommittedResult);
                this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
                this.waitForStateTransitionContains(stateTransitions1, CRASH);
                this.commitErrorInjectedClient1.set(false);
                stateTransitions1.clear();
                streams1V2.close();
                Assertions.assertFalse((boolean)this.hasUnexpectedError, (String)UNEXPECTED_EXCEPTION_MSG);
                expectedCommittedResultAfterFailure = this.computeExpectedResult(uncommittedInputDataBetweenUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResultAfterFailure);
                expectedUncommittedResult.addAll(expectedCommittedResultAfterFailure);
                stateTransitions1.clear();
                stateTransitions2.clear();
                streams1V2Two = this.getKafkaStreams(APP_DIR_1, "exactly_once_v2", true);
                streams1V2Two.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair((Object)oldState, (Object)newState)));
                this.assignmentListener.prepareForRebalance();
                streams1V2Two.start();
                this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
                this.waitForRunning(stateTransitions1);
                this.waitForRunning(stateTransitions2);
            }
            cleanKeys.addAll(Utils.mkSet((Object[])new Long[]{0L, 1L, 2L, 3L}));
            Set<Long> keysSecondClientAlphaTwo = this.keysFromInstance(streams2AlphaTwo);
            long secondFailingKeyForCrashCase = keysSecondClientAlphaTwo.iterator().next();
            cleanKeys.remove(secondFailingKeyForCrashCase);
            LinkedList<KeyValue<Long, Long>> uncommittedInputDataBeforeSecondUpgrade = new LinkedList<KeyValue<Long, Long>>();
            if (!injectError) {
                uncommittedInputDataBeforeSecondUpgrade.addAll(this.prepareData(30L, 35L, 0L, 1L, 2L, 3L));
                this.writeInputData(uncommittedInputDataBeforeSecondUpgrade);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBeforeSecondUpgrade, new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
            } else {
                LinkedList<KeyValue<Long, Long>> uncommittedInputDataWithoutFailingKey = new LinkedList<KeyValue<Long, Long>>();
                otherKey = cleanKeys.iterator();
                while (otherKey.hasNext()) {
                    long key = (Long)otherKey.next();
                    uncommittedInputDataWithoutFailingKey.addAll(this.prepareData(30L, 35L, key));
                }
                uncommittedInputDataWithoutFailingKey.addAll(this.prepareData(30L, 34L, secondFailingKeyForCrashCase));
                uncommittedInputDataBeforeSecondUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
                this.writeInputData(uncommittedInputDataWithoutFailingKey);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataWithoutFailingKey, new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
            }
            stateTransitions1.clear();
            this.assignmentListener.prepareForRebalance();
            if (!injectError) {
                stateTransitions2.clear();
                streams2AlphaTwo.close();
                this.waitForStateTransition(stateTransitions2, CLOSE);
            } else {
                this.errorInjectedClient2.set(true);
                List<KeyValue<Long, Long>> dataPotentiallySecondFailingKey = this.prepareData(34L, 35L, secondFailingKeyForCrashCase);
                uncommittedInputDataBeforeSecondUpgrade.addAll(dataPotentiallySecondFailingKey);
                this.writeInputData(dataPotentiallySecondFailingKey);
            }
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            this.waitForRunning(stateTransitions1);
            if (!injectError) {
                List<KeyValue<Long, Long>> committedInputDataDuringSecondUpgrade = uncommittedInputDataBeforeSecondUpgrade.stream().filter(pair -> keysSecondClientAlphaTwo.contains(pair.key)).collect(Collectors.toList());
                List<KeyValue<Long, Long>> expectedCommittedResult = this.computeExpectedResult(committedInputDataDuringSecondUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResult);
            } else {
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBeforeSecondUpgrade.stream().filter(pair -> keysSecondClientAlphaTwo.contains(pair.key)).collect(Collectors.toList()), new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
                this.waitForStateTransitionContains(stateTransitions2, CRASH);
                this.errorInjectedClient2.set(false);
                stateTransitions2.clear();
                streams2AlphaTwo.close();
                Assertions.assertFalse((boolean)this.hasUnexpectedError, (String)UNEXPECTED_EXCEPTION_MSG);
            }
            this.commitRequested.set(0);
            stateTransitions1.clear();
            stateTransitions2.clear();
            streams2V2 = this.getKafkaStreams(APP_DIR_2, "exactly_once_v2", injectError);
            streams2V2.setStateListener((newState, oldState) -> stateTransitions2.add(KeyValue.pair((Object)oldState, (Object)newState)));
            this.assignmentListener.prepareForRebalance();
            streams2V2.start();
            this.assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
            this.waitForRunning(stateTransitions1);
            this.waitForRunning(stateTransitions2);
            newlyCommittedKeys.clear();
            if (!injectError) {
                newlyCommittedKeys.addAll(this.keysFromInstance(streams2V2));
                newlyCommittedKeys.removeAll(keysSecondClientAlphaTwo);
            } else {
                newlyCommittedKeys.addAll(Utils.mkSet((Object[])new Long[]{0L, 1L, 2L, 3L}));
            }
            List<KeyValue<Long, Long>> expectedCommittedResultAfterRestartSecondClient = this.computeExpectedResult(uncommittedInputDataBeforeSecondUpgrade.stream().filter(pair -> newlyCommittedKeys.contains(pair.key)).collect(Collectors.toList()), committedState);
            this.verifyCommitted(expectedCommittedResultAfterRestartSecondClient);
            this.commitCounterClient1.set(-1);
            this.commitCounterClient2.set(-1);
            List<KeyValue<Long, Long>> finishLastBatch = this.prepareData(35L, 40L, 0L, 1L, 2L, 3L);
            this.writeInputData(finishLastBatch);
            Set uncommittedKeys = Utils.mkSet((Object[])new Long[]{0L, 1L, 2L, 3L});
            uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
            uncommittedKeys.removeAll(newlyCommittedKeys);
            List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade.stream().filter(pair -> uncommittedKeys.contains(pair.key)).collect(Collectors.toList());
            committedInputDataDuringUpgrade.addAll(finishLastBatch);
            List<KeyValue<Long, Long>> expectedCommittedResult = this.computeExpectedResult(committedInputDataDuringUpgrade, committedState);
            this.verifyCommitted(expectedCommittedResult);
        }
        finally {
            if (streams1Alpha != null) {
                streams1Alpha.close();
            }
            if (streams1V2 != null) {
                streams1V2.close();
            }
            if (streams1V2Two != null) {
                streams1V2Two.close();
            }
            if (streams2Alpha != null) {
                streams2Alpha.close();
            }
            if (streams2AlphaTwo != null) {
                streams2AlphaTwo.close();
            }
            if (streams2V2 != null) {
                streams2V2.close();
            }
        }
    }

    private KafkaStreams getKafkaStreams(String appDir, String processingGuarantee, boolean injectError) {
        StreamsBuilder builder = new StreamsBuilder();
        String[] storeNames = new String[]{"store"};
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"), (Serde)Serdes.Long(), (Serde)Serdes.Long()).withCachingEnabled();
        builder.addStateStore(storeBuilder);
        KStream input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
        input.transform((TransformerSupplier)new TransformerSupplier<Long, Long, KeyValue<Long, Long>>(){

            public Transformer<Long, Long, KeyValue<Long, Long>> get() {
                return new Transformer<Long, Long, KeyValue<Long, Long>>(){
                    ProcessorContext context;
                    KeyValueStore<Long, Long> state = null;
                    AtomicBoolean crash;
                    AtomicInteger sharedCommit;

                    public void init(ProcessorContext context) {
                        this.context = context;
                        this.state = (KeyValueStore)context.getStateStore("store");
                        String clientId = context.appConfigs().get("client.id").toString();
                        if (EosV2UpgradeIntegrationTest.APP_DIR_1.equals(clientId)) {
                            this.crash = EosV2UpgradeIntegrationTest.this.errorInjectedClient1;
                            this.sharedCommit = EosV2UpgradeIntegrationTest.this.commitCounterClient1;
                        } else {
                            this.crash = EosV2UpgradeIntegrationTest.this.errorInjectedClient2;
                            this.sharedCommit = EosV2UpgradeIntegrationTest.this.commitCounterClient2;
                        }
                    }

                    public KeyValue<Long, Long> transform(Long key, Long value) {
                        Long sum;
                        if ((value + 1L) % 10L == 0L) {
                            if (this.sharedCommit.get() < 0 || this.sharedCommit.incrementAndGet() == 2) {
                                this.context.commit();
                            }
                            EosV2UpgradeIntegrationTest.this.commitRequested.incrementAndGet();
                        }
                        sum = (sum = (Long)this.state.get((Object)key)) == null ? value : Long.valueOf(sum + value);
                        this.state.put((Object)key, (Object)sum);
                        this.state.flush();
                        if (value % 10L == 4L && this.crash != null && this.crash.compareAndSet(true, false)) {
                            throw new RuntimeException("Injected test exception.");
                        }
                        return new KeyValue((Object)key, this.state.get((Object)key));
                    }

                    public void close() {
                    }
                };
            }
        }, storeNames).to(MULTI_PARTITION_OUTPUT_TOPIC);
        Properties properties = new Properties();
        properties.put("client.id", appDir);
        properties.put("processing.guarantee", processingGuarantee);
        long commitInterval = Duration.ofMinutes(5L).toMillis();
        properties.put("commit.interval.ms", (Object)commitInterval);
        properties.put(StreamsConfig.consumerPrefix((String)"metadata.max.age.ms"), (Object)Duration.ofSeconds(1L).toMillis());
        properties.put(StreamsConfig.consumerPrefix((String)"auto.offset.reset"), "earliest");
        properties.put(StreamsConfig.consumerPrefix((String)"request.timeout.ms"), (Object)((int)Duration.ofSeconds(5L).toMillis()));
        properties.put(StreamsConfig.consumerPrefix((String)"session.timeout.ms"), (Object)((int)Duration.ofSeconds(5L).minusMillis(1L).toMillis()));
        properties.put(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms"), (Object)MAX_POLL_INTERVAL_MS);
        properties.put(StreamsConfig.producerPrefix((String)"transaction.timeout.ms"), (Object)((int)commitInterval));
        properties.put(StreamsConfig.producerPrefix((String)"partitioner.class"), KeyPartitioner.class);
        properties.put("statestore.cache.max.bytes", (Object)0);
        properties.put("state.dir", TestUtils.tempDirectory().getPath() + File.separator + appDir);
        properties.put("__assignment.listener__", this.assignmentListener);
        Properties config = StreamsTestUtils.getStreamsConfig(applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties);
        KafkaStreams streams = new KafkaStreams(builder.build(), config, (KafkaClientSupplier)new TestKafkaClientSupplier());
        streams.setUncaughtExceptionHandler(e -> {
            if (!injectError) {
                e.printStackTrace(System.err);
                this.hasUnexpectedError = true;
            } else {
                int exceptionCount = this.exceptionCounts.get(appDir);
                if (++exceptionCount > 2 || !(e instanceof StreamsException) || !e.getCause().getMessage().endsWith(" test exception.")) {
                    e.printStackTrace(System.err);
                    this.hasUnexpectedError = true;
                }
                this.exceptionCounts.put(appDir, exceptionCount);
            }
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        });
        return streams;
    }

    private void waitForRunning(List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed) throws Exception {
        TestUtils.waitForCondition(() -> !observed.isEmpty() && ((KafkaStreams.State)((KeyValue)observed.get((int)(observed.size() - 1))).value).equals((Object)KafkaStreams.State.RUNNING), (long)MAX_WAIT_TIME_MS, () -> "Client did not startup on time. Observers transitions: " + observed);
    }

    private void waitForStateTransition(List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed, List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected) throws Exception {
        TestUtils.waitForCondition(() -> observed.equals(expected), (long)MAX_WAIT_TIME_MS, () -> "Client did not have the expected state transition on time. Observers transitions: " + observed + "Expected transitions: " + expected);
    }

    private void waitForStateTransitionContains(List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed, List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected) throws Exception {
        TestUtils.waitForCondition(() -> observed.containsAll(expected), (long)MAX_WAIT_TIME_MS, () -> "Client did not have the expected state transition on time. Observers transitions: " + observed + "Expected transitions: " + expected);
    }

    private List<KeyValue<Long, Long>> prepareData(long fromInclusive, long toExclusive, Long ... keys) {
        ArrayList<KeyValue<Long, Long>> data = new ArrayList<KeyValue<Long, Long>>();
        for (Long k : keys) {
            for (long v = fromInclusive; v < toExclusive; ++v) {
                data.add((KeyValue<Long, Long>)new KeyValue((Object)k, (Object)v));
            }
        }
        return data;
    }

    private void writeInputData(List<KeyValue<Long, Long>> records) {
        Properties config = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class);
        config.setProperty("partitioner.class", KeyPartitioner.class.getName());
        IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, records, config, (Time)EosV2UpgradeIntegrationTest.CLUSTER.time);
    }

    private void verifyCommitted(List<KeyValue<Long, Long>> expectedResult) throws Exception {
        List<KeyValue<Long, Long>> committedOutput = this.readResult(expectedResult.size(), true);
        this.checkResultPerKey(committedOutput, expectedResult);
    }

    private void verifyUncommitted(List<KeyValue<Long, Long>> expectedResult) throws Exception {
        List<KeyValue<Long, Long>> uncommittedOutput = this.readResult(expectedResult.size(), false);
        this.checkResultPerKey(uncommittedOutput, expectedResult);
    }

    private List<KeyValue<Long, Long>> readResult(int numberOfRecords, boolean readCommitted) throws Exception {
        if (readCommitted) {
            return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, (Properties)Utils.mkProperties(Collections.singletonMap("isolation.level", IsolationLevel.READ_COMMITTED.toString()))), MULTI_PARTITION_OUTPUT_TOPIC, numberOfRecords, MAX_WAIT_TIME_MS);
        }
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), MULTI_PARTITION_OUTPUT_TOPIC, numberOfRecords);
    }

    private void checkResultPerKey(List<KeyValue<Long, Long>> result, List<KeyValue<Long, Long>> expectedResult) {
        HashSet<Long> allKeys = new HashSet<Long>();
        this.addAllKeys(allKeys, result);
        this.addAllKeys(allKeys, expectedResult);
        for (Long key : allKeys) {
            try {
                MatcherAssert.assertThat(this.getAllRecordPerKey(key, result), (Matcher)CoreMatchers.equalTo(this.getAllRecordPerKey(key, expectedResult)));
            }
            catch (AssertionError error) {
                throw new AssertionError("expected result: " + expectedResult.stream().map(KeyValue::toString).collect(Collectors.joining(", ")) + "\nreceived records: " + result.stream().map(KeyValue::toString).collect(Collectors.joining(", ")), (Throwable)((Object)error));
            }
        }
    }

    private void addAllKeys(Set<Long> allKeys, List<KeyValue<Long, Long>> records) {
        for (KeyValue<Long, Long> record : records) {
            allKeys.add((Long)record.key);
        }
    }

    private List<KeyValue<Long, Long>> getAllRecordPerKey(Long key, List<KeyValue<Long, Long>> records) {
        ArrayList<KeyValue<Long, Long>> recordsPerKey = new ArrayList<KeyValue<Long, Long>>(records.size());
        for (KeyValue<Long, Long> record : records) {
            if (!((Long)record.key).equals(key)) continue;
            recordsPerKey.add(record);
        }
        return recordsPerKey;
    }

    private List<KeyValue<Long, Long>> computeExpectedResult(List<KeyValue<Long, Long>> input, Map<Long, Long> currentState) {
        ArrayList<KeyValue<Long, Long>> expectedResult = new ArrayList<KeyValue<Long, Long>>(input.size());
        for (KeyValue<Long, Long> record : input) {
            long sum = currentState.getOrDefault(record.key, 0L);
            currentState.put((Long)record.key, sum + (Long)record.value);
            expectedResult.add((KeyValue<Long, Long>)new KeyValue(record.key, (Object)(sum + (Long)record.value)));
        }
        return expectedResult;
    }

    private Set<Long> keysFromInstance(KafkaStreams streams) throws Exception {
        HashSet<Long> keys = new HashSet<Long>();
        TestUtils.waitForCondition(() -> {
            ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)streams.store(StoreQueryParameters.fromNameAndType((String)"store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
            keys.clear();
            try (KeyValueIterator it = store.all();){
                while (it.hasNext()) {
                    KeyValue row = (KeyValue)it.next();
                    keys.add((Long)row.key);
                }
            }
            return true;
        }, (long)MAX_WAIT_TIME_MS, (String)"Could not get keys from store: store");
        return keys;
    }

    private class ErrorInjector
    extends KafkaProducer<byte[], byte[]> {
        private final AtomicBoolean crash;

        public ErrorInjector(Map<String, Object> configs) {
            super(configs, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
            String clientId = configs.get("client.id").toString();
            this.crash = clientId.contains(EosV2UpgradeIntegrationTest.APP_DIR_1) ? EosV2UpgradeIntegrationTest.this.commitErrorInjectedClient1 : EosV2UpgradeIntegrationTest.this.commitErrorInjectedClient2;
        }

        public void commitTransaction() {
            super.flush();
            if (this.crash.compareAndSet(true, false)) {
                throw new RuntimeException("Injected producer commit test exception.");
            }
            super.commitTransaction();
        }
    }

    private class TestKafkaClientSupplier
    extends DefaultKafkaClientSupplier {
        private TestKafkaClientSupplier() {
        }

        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
            return new ErrorInjector(config);
        }
    }

    public static class KeyPartitioner
    implements Partitioner {
        private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % 4;
        }

        public void close() {
        }

        public void configure(Map<String, ?> configs) {
        }
    }
}

