/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Ignore
@Category(value={IntegrationTest.class})
public abstract class AbstractResetIntegrationTest {
    static String testId;
    static EmbeddedKafkaCluster cluster;
    static Map<String, Object> sslConfig;
    private static KafkaStreams streams;
    private static MockTime mockTime;
    private static AdminClient adminClient;
    private static KafkaAdminClient kafkaAdminClient;
    private String appID;
    private Properties commonClientConfig;
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
    private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
    private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
    private static final int TIMEOUT_MULTIPLIER = 5;
    private final TestCondition consumerGroupInactiveCondition = new TestCondition(){

        public boolean conditionMet() {
            return adminClient.describeConsumerGroup(testId + "-result-consumer", 0L).consumers().get().isEmpty();
        }
    };
    private static final Properties STREAMS_CONFIG;
    private static final Properties PRODUCER_CONFIG;
    private static final Properties RESULT_CONSUMER_CONFIG;

    @AfterClass
    public static void afterClassCleanup() {
        if (adminClient != null) {
            adminClient.close();
            adminClient = null;
        }
        if (kafkaAdminClient != null) {
            kafkaAdminClient.close(10L, TimeUnit.SECONDS);
            kafkaAdminClient = null;
        }
    }

    private void prepareEnvironment() {
        if (adminClient == null) {
            adminClient = AdminClient.create(this.commonClientConfig);
        }
        if (kafkaAdminClient == null) {
            kafkaAdminClient = (KafkaAdminClient)org.apache.kafka.clients.admin.AdminClient.create(this.commonClientConfig);
        }
        long alignedTime = (System.currentTimeMillis() / 1000L + 1L) * 1000L;
        mockTime = AbstractResetIntegrationTest.cluster.time;
        mockTime.setCurrentTimeMs(alignedTime);
    }

    private void prepareConfigs() {
        this.commonClientConfig = new Properties();
        this.commonClientConfig.put("bootstrap.servers", cluster.bootstrapServers());
        if (sslConfig != null) {
            this.commonClientConfig.put("ssl.truststore.location", sslConfig.get("ssl.truststore.location"));
            this.commonClientConfig.put("ssl.truststore.password", ((Password)sslConfig.get("ssl.truststore.password")).value());
            this.commonClientConfig.put("security.protocol", "SSL");
        }
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("retries", (Object)0);
        PRODUCER_CONFIG.put("key.serializer", LongSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
        PRODUCER_CONFIG.putAll((Map<?, ?>)this.commonClientConfig);
        RESULT_CONSUMER_CONFIG.put("group.id", testId + "-result-consumer");
        RESULT_CONSUMER_CONFIG.put("auto.offset.reset", "earliest");
        RESULT_CONSUMER_CONFIG.put("key.deserializer", LongDeserializer.class);
        RESULT_CONSUMER_CONFIG.put("value.deserializer", LongDeserializer.class);
        RESULT_CONSUMER_CONFIG.putAll((Map<?, ?>)this.commonClientConfig);
        STREAMS_CONFIG.put("state.dir", this.testFolder.getRoot().getPath());
        STREAMS_CONFIG.put("default.key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("cache.max.bytes.buffering", (Object)0);
        STREAMS_CONFIG.put("commit.interval.ms", (Object)100);
        STREAMS_CONFIG.put("heartbeat.interval.ms", (Object)100);
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("session.timeout.ms", "2000");
        STREAMS_CONFIG.put("internal.leave.group.on.close", (Object)true);
        STREAMS_CONFIG.putAll((Map<?, ?>)this.commonClientConfig);
    }

    void prepareTest() throws Exception {
        this.prepareConfigs();
        this.prepareEnvironment();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Test consumer group active even after waiting 10000 ms.");
        cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
        this.add10InputElements();
    }

    void cleanupTest() throws Exception {
        if (streams != null) {
            streams.close(30L, TimeUnit.SECONDS);
        }
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
    }

    private void add10InputElements() throws ExecutionException, InterruptedException {
        List<KeyValue> records = Arrays.asList(KeyValue.pair(0L, "aaa"), KeyValue.pair(1L, "bbb"), KeyValue.pair(0L, "ccc"), KeyValue.pair(1L, "ddd"), KeyValue.pair(0L, "eee"), KeyValue.pair(1L, "fff"), KeyValue.pair(0L, "ggg"), KeyValue.pair(1L, "hhh"), KeyValue.pair(0L, "iii"), KeyValue.pair(1L, "jjj"));
        for (KeyValue record : records) {
            mockTime.sleep(10L);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), PRODUCER_CONFIG, mockTime.milliseconds());
        }
    }

    void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
        this.appID = testId + "-not-reset-during-runtime";
        String[] parameters = new String[]{"--application-id", this.appID, "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC, "--execute"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", "2000");
        STREAMS_CONFIG.put("application.id", this.appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
        streams.start();
        int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
        Assert.assertEquals((long)1L, (long)exitCode);
        streams.close();
    }

    public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
        this.appID = testId + "-not-reset-without-input-topic";
        String[] parameters = new String[]{"--application-id", this.appID, "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC, "--execute"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", "2000");
        int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
        Assert.assertEquals((long)1L, (long)exitCode);
    }

    public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception {
        this.appID = testId + "-not-reset-without-intermediate-topic";
        String[] parameters = new String[]{"--application-id", this.appID, "--bootstrap-servers", cluster.bootstrapServers(), "--intermediate-topics", NON_EXISTING_TOPIC, "--execute"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", "2000");
        int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
        Assert.assertEquals((long)1L, (long)exitCode);
    }

    void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
        this.appID = testId + "-from-scratch";
        STREAMS_CONFIG.put("application.id", this.appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
        streams.start();
        List result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
        streams.cleanUp();
        this.cleanGlobal(false, null, null);
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted(null);
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result2));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.cleanGlobal(false, null, null);
    }

    void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
        cluster.createTopic(INTERMEDIATE_USER_TOPIC);
        this.appID = testId + "-from-scratch-with-intermediate-topic";
        STREAMS_CONFIG.put("application.id", this.appID);
        streams = new KafkaStreams(this.setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), STREAMS_CONFIG);
        streams.start();
        List result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
        List result22 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2, 40);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        mockTime.sleep(1L);
        KeyValue<Long, String> badMessage = new KeyValue<Long, String>(-1L, "badRecord-ShouldBeSkipped");
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INTERMEDIATE_USER_TOPIC, Collections.singleton(badMessage), PRODUCER_CONFIG, mockTime.milliseconds());
        streams = new KafkaStreams(this.setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), STREAMS_CONFIG);
        streams.cleanUp();
        this.cleanGlobal(true, null, null);
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
        List resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2_RERUN, 40);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result2));
        MatcherAssert.assertThat(resultRerun2, (Matcher)CoreMatchers.equalTo(result22));
        Properties props = TestUtils.consumerConfig((String)cluster.bootstrapServers(), (String)(testId + "-result-consumer"), LongDeserializer.class, StringDeserializer.class, (Properties)this.commonClientConfig);
        List resultIntermediate = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props, INTERMEDIATE_USER_TOPIC, 21);
        for (int i = 0; i < 10; ++i) {
            MatcherAssert.assertThat(resultIntermediate.get(i), (Matcher)CoreMatchers.equalTo(resultIntermediate.get(i + 11)));
        }
        MatcherAssert.assertThat(resultIntermediate.get(10), (Matcher)CoreMatchers.equalTo(badMessage));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.cleanGlobal(true, null, null);
        cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
    }

    void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
        this.appID = testId + "-from-file";
        STREAMS_CONFIG.put("application.id", this.appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
        streams.start();
        List result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        File resetFile = File.createTempFile("reset", ".csv");
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile));){
            writer.write("inputTopic,0,1");
            writer.close();
        }
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
        streams.cleanUp();
        this.cleanGlobal(false, "--from-file", resetFile.getAbsolutePath());
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted(null);
        resetFile.deleteOnExit();
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 5);
        streams.close();
        result2.remove(0);
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result2));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.cleanGlobal(false, null, null);
    }

    void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
        this.appID = testId + "-from-datetime";
        STREAMS_CONFIG.put("application.id", this.appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
        streams.start();
        List result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        File resetFile = File.createTempFile("reset", ".csv");
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile));){
            writer.write("inputTopic,0,1");
            writer.close();
        }
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
        streams.cleanUp();
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
        Calendar calendar = Calendar.getInstance();
        calendar.add(5, -1);
        this.cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted(null);
        resetFile.deleteOnExit();
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result2));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.cleanGlobal(false, null, null);
    }

    void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
        this.appID = testId + "-from-duration";
        STREAMS_CONFIG.put("application.id", this.appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
        streams.start();
        List result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        File resetFile = File.createTempFile("reset", ".csv");
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile));){
            writer.write("inputTopic,0,1");
            writer.close();
        }
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
        streams.cleanUp();
        this.cleanGlobal(false, "--by-duration", "PT1M");
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted(null);
        resetFile.deleteOnExit();
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result2));
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactiveCondition, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.cleanGlobal(false, null, null);
    }

    private Topology setupTopologyWithIntermediateUserTopic(String outputTopic2) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<Long, String> input = builder.stream(INPUT_TOPIC);
        input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>(){

            @Override
            public KeyValue<Long, String> apply(Long key, String value) {
                return new KeyValue<Long, String>(key, value);
            }
        }).groupByKey().count().toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
        input.through(INTERMEDIATE_USER_TOPIC).groupByKey().windowedBy(TimeWindows.of(35L).advanceBy(10L)).count().toStream().map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>(){

            @Override
            public KeyValue<Long, Long> apply(Windowed<Long> key, Long value) {
                return new KeyValue<Long, Long>(key.window().start() + key.window().end(), value);
            }
        }).to(outputTopic2, Produced.with(Serdes.Long(), Serdes.Long()));
        return builder.build();
    }

    private Topology setupTopologyWithoutIntermediateUserTopic() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<Long, String> input = builder.stream(INPUT_TOPIC);
        input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>(){

            @Override
            public KeyValue<Long, Long> apply(Long key, String value) {
                return new KeyValue<Long, Long>(key, key);
            }
        }).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
        return builder.build();
    }

    private void cleanGlobal(boolean withIntermediateTopics, String resetScenario, String resetScenarioArg) throws Exception {
        ArrayList<String> parameterList = new ArrayList<String>(Arrays.asList("--application-id", this.appID, "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", INPUT_TOPIC, "--execute"));
        if (withIntermediateTopics) {
            parameterList.add("--intermediate-topics");
            parameterList.add(INTERMEDIATE_USER_TOPIC);
        }
        if (sslConfig != null) {
            File configFile = TestUtils.tempFile();
            BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
            writer.write("security.protocol=SSL\n");
            writer.write("ssl.truststore.location=" + sslConfig.get("ssl.truststore.location") + "\n");
            writer.write("ssl.truststore.password=" + ((Password)sslConfig.get("ssl.truststore.password")).value() + "\n");
            writer.close();
            parameterList.add("--config-file");
            parameterList.add(configFile.getAbsolutePath());
        }
        if (resetScenario != null) {
            parameterList.add(resetScenario);
        }
        if (resetScenarioArg != null) {
            parameterList.add(resetScenarioArg);
        }
        String[] parameters = parameterList.toArray(new String[parameterList.size()]);
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", "2000");
        int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
        Assert.assertEquals((long)0L, (long)exitCode);
    }

    private void assertInternalTopicsGotDeleted(String intermediateUserTopic) throws Exception {
        if (intermediateUserTopic != null) {
            cluster.waitForRemainingTopics(30000L, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets", intermediateUserTopic);
        } else {
            cluster.waitForRemainingTopics(30000L, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets");
        }
    }

    static {
        sslConfig = null;
        adminClient = null;
        kafkaAdminClient = null;
        STREAMS_CONFIG = new Properties();
        PRODUCER_CONFIG = new Properties();
        RESULT_CONSUMER_CONFIG = new Properties();
    }
}

