/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.tests.StreamsUpgradeTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag(value="integration")
@Timeout(value=600L)
public class StreamsUpgradeTestIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 1, "data");
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Test
    public void testVersionProbingUpgrade() throws InterruptedException {
        KafkaStreams kafkaStreams1 = StreamsUpgradeTest.buildStreams(Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())})));
        KafkaStreams kafkaStreams2 = StreamsUpgradeTest.buildStreams(Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())})));
        KafkaStreams kafkaStreams3 = StreamsUpgradeTest.buildStreams(Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())})));
        StreamsUpgradeTestIntegrationTest.startSync(kafkaStreams1, kafkaStreams2, kafkaStreams3);
        kafkaStreams1.close();
        AtomicInteger usedVersion4 = new AtomicInteger();
        KafkaStreams kafkaStreams4 = StreamsUpgradeTestIntegrationTest.buildFutureStreams(usedVersion4);
        StreamsUpgradeTestIntegrationTest.startSync(kafkaStreams4);
        MatcherAssert.assertThat((Object)usedVersion4.get(), (Matcher)CoreMatchers.is((Object)11));
        kafkaStreams2.close();
        AtomicInteger usedVersion5 = new AtomicInteger();
        KafkaStreams kafkaStreams5 = StreamsUpgradeTestIntegrationTest.buildFutureStreams(usedVersion5);
        StreamsUpgradeTestIntegrationTest.startSync(kafkaStreams5);
        MatcherAssert.assertThat((Object)usedVersion5.get(), (Matcher)CoreMatchers.is((Object)11));
        kafkaStreams3.close();
        AtomicInteger usedVersion6 = new AtomicInteger();
        KafkaStreams kafkaStreams6 = StreamsUpgradeTestIntegrationTest.buildFutureStreams(usedVersion6);
        StreamsUpgradeTestIntegrationTest.startSync(kafkaStreams6);
        TestUtils.retryOnExceptionWithTimeout(() -> MatcherAssert.assertThat((Object)usedVersion6.get(), (Matcher)CoreMatchers.is((Object)12)));
        TestUtils.retryOnExceptionWithTimeout(() -> MatcherAssert.assertThat((Object)usedVersion5.get(), (Matcher)CoreMatchers.is((Object)12)));
        TestUtils.retryOnExceptionWithTimeout(() -> MatcherAssert.assertThat((Object)usedVersion4.get(), (Matcher)CoreMatchers.is((Object)12)));
        kafkaStreams4.close(Duration.ZERO);
        kafkaStreams5.close(Duration.ZERO);
        kafkaStreams6.close(Duration.ZERO);
        kafkaStreams4.close();
        kafkaStreams5.close();
        kafkaStreams6.close();
    }

    private static KafkaStreams buildFutureStreams(AtomicInteger usedVersion4) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("test.future.metadata", usedVersion4);
        return StreamsUpgradeTest.buildStreams(properties);
    }

    private static void startSync(KafkaStreams ... kafkaStreams) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(kafkaStreams.length);
        for (KafkaStreams streams : kafkaStreams) {
            streams.setStateListener((newState, oldState) -> {
                if (newState == KafkaStreams.State.RUNNING) {
                    latch.countDown();
                }
            });
        }
        for (KafkaStreams streams : kafkaStreams) {
            streams.start();
        }
        latch.await();
    }
}

