/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=600L)
@Tag(value="integration")
public class LagFetchIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final long WAIT_TIMEOUT_MS = 120000L;
    private static final Logger LOG = LoggerFactory.getLogger(LagFetchIntegrationTest.class);
    private final MockTime mockTime;
    private Properties streamsConfiguration;
    private Properties consumerConfiguration;
    private String inputTopicName;
    private String outputTopicName;
    private String stateStoreName;

    public LagFetchIntegrationTest() {
        this.mockTime = LagFetchIntegrationTest.CLUSTER.time;
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), testInfo);
        this.inputTopicName = "input-topic-" + safeTestName;
        this.outputTopicName = "output-topic-" + safeTestName;
        this.stateStoreName = "lagfetch-test-store" + safeTestName;
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("commit.interval.ms", (Object)100L);
        this.consumerConfiguration = new Properties();
        this.consumerConfiguration.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        this.consumerConfiguration.setProperty("group.id", "group-" + safeTestName);
        this.consumerConfiguration.setProperty("auto.offset.reset", "earliest");
        this.consumerConfiguration.setProperty("key.deserializer", StringDeserializer.class.getName());
        this.consumerConfiguration.setProperty("value.deserializer", LongDeserializer.class.getName());
    }

    @AfterEach
    public void shutdown() throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    private Map<String, Map<Integer, LagInfo>> getFirstNonEmptyLagMap(KafkaStreams streams) throws InterruptedException {
        HashMap<String, Map<Integer, LagInfo>> offsetLagInfoMap = new HashMap<String, Map<Integer, LagInfo>>();
        TestUtils.waitForCondition(() -> {
            Map lagMap = streams.allLocalStorePartitionLags();
            if (lagMap.size() > 0) {
                offsetLagInfoMap.putAll(lagMap);
            }
            return lagMap.size() > 0;
        }, (long)120000L, (String)"Should obtain non-empty lag information eventually");
        return offsetLagInfoMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shouldFetchLagsDuringRebalancing(String optimization) throws Exception {
        CountDownLatch latchTillActiveIsRunning = new CountDownLatch(1);
        CountDownLatch latchTillStandbyIsRunning = new CountDownLatch(1);
        CountDownLatch latchTillStandbyHasPartitionsAssigned = new CountDownLatch(1);
        CyclicBarrier lagCheckBarrier = new CyclicBarrier(2);
        ArrayList<KafkaStreamsWrapper> streamsList = new ArrayList<KafkaStreamsWrapper>();
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputTopicName, Utils.mkSet((Object[])new KeyValue[]{new KeyValue((Object)"k1", (Object)1L), new KeyValue((Object)"k2", (Object)2L), new KeyValue((Object)"k3", (Object)3L), new KeyValue((Object)"k4", (Object)4L), new KeyValue((Object)"k5", (Object)5L)}), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        for (int i = 0; i < 2; ++i) {
            Properties props = (Properties)this.streamsConfiguration.clone();
            props.put("internal.task.assignor.class", FallbackPriorTaskAssignor.class.getName());
            props.put("application.server", "localhost:" + i);
            props.put("client.id", "instance-" + i);
            props.put("topology.optimization", optimization);
            props.put("num.standby.replicas", (Object)1);
            props.put("state.dir", TestUtils.tempDirectory((String)(this.stateStoreName + i)).getAbsolutePath());
            StreamsBuilder builder = new StreamsBuilder();
            KTable kTable = builder.table(this.inputTopicName, Materialized.as((String)this.stateStoreName));
            kTable.toStream().to(this.outputTopicName);
            KafkaStreamsWrapper streams = new KafkaStreamsWrapper(builder.build(props), props);
            streamsList.add(streams);
        }
        KafkaStreamsWrapper activeStreams = (KafkaStreamsWrapper)((Object)streamsList.get(0));
        KafkaStreamsWrapper standbyStreams = (KafkaStreamsWrapper)((Object)streamsList.get(1));
        activeStreams.setStreamThreadStateListener((thread, newState, oldState) -> {
            if (newState == StreamThread.State.RUNNING) {
                latchTillActiveIsRunning.countDown();
            }
        });
        standbyStreams.setStreamThreadStateListener((thread, newState, oldState) -> {
            if (oldState == StreamThread.State.PARTITIONS_ASSIGNED && newState == StreamThread.State.RUNNING) {
                latchTillStandbyHasPartitionsAssigned.countDown();
                try {
                    lagCheckBarrier.await(60L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } else if (newState == StreamThread.State.RUNNING) {
                latchTillStandbyIsRunning.countDown();
            }
        });
        try {
            TestUtils.waitForCondition(() -> activeStreams.allLocalStorePartitionLags().size() == 0, (long)120000L, (String)"Should see empty lag map before streams is started.");
            activeStreams.start();
            latchTillActiveIsRunning.await(60L, TimeUnit.SECONDS);
            IntegrationTestUtils.waitUntilMinValuesRecordsReceived(this.consumerConfiguration, this.outputTopicName, 5, 120000L);
            Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = this.getFirstNonEmptyLagMap(activeStreams);
            MatcherAssert.assertThat((Object)offsetLagInfoMap.size(), (Matcher)IsEqual.equalTo((Object)1));
            MatcherAssert.assertThat(offsetLagInfoMap.keySet(), (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new String[]{this.stateStoreName})));
            MatcherAssert.assertThat((Object)offsetLagInfoMap.get(this.stateStoreName).size(), (Matcher)IsEqual.equalTo((Object)1));
            LagInfo lagInfo = offsetLagInfoMap.get(this.stateStoreName).get(0);
            MatcherAssert.assertThat((Object)lagInfo.currentOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)lagInfo.endOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)lagInfo.offsetLag(), (Matcher)IsEqual.equalTo((Object)0L));
            standbyStreams.start();
            latchTillStandbyHasPartitionsAssigned.await(60L, TimeUnit.SECONDS);
            offsetLagInfoMap = this.getFirstNonEmptyLagMap(standbyStreams);
            MatcherAssert.assertThat((Object)offsetLagInfoMap.size(), (Matcher)IsEqual.equalTo((Object)1));
            MatcherAssert.assertThat(offsetLagInfoMap.keySet(), (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new String[]{this.stateStoreName})));
            MatcherAssert.assertThat((Object)offsetLagInfoMap.get(this.stateStoreName).size(), (Matcher)IsEqual.equalTo((Object)1));
            LagInfo lagInfo2 = offsetLagInfoMap.get(this.stateStoreName).get(0);
            MatcherAssert.assertThat((Object)lagInfo2.currentOffsetPosition(), (Matcher)IsEqual.equalTo((Object)0L));
            MatcherAssert.assertThat((Object)lagInfo2.endOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)lagInfo2.offsetLag(), (Matcher)IsEqual.equalTo((Object)5L));
            lagCheckBarrier.await(60L, TimeUnit.SECONDS);
            TestUtils.waitForCondition(() -> ((LagInfo)((Map)standbyStreams.allLocalStorePartitionLags().get(this.stateStoreName)).get(0)).offsetLag() == 0L, (long)120000L, (String)"Standby should eventually catchup and have zero lag.");
        }
        finally {
            for (KafkaStreams kafkaStreams : streamsList) {
                kafkaStreams.close();
            }
        }
    }

    @Test
    public void shouldFetchLagsDuringRebalancingWithOptimization() throws Exception {
        this.shouldFetchLagsDuringRebalancing("all");
    }

    @Test
    public void shouldFetchLagsDuringRebalancingWithNoOptimization() throws Exception {
        this.shouldFetchLagsDuringRebalancing("none");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldFetchLagsDuringRestoration() throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputTopicName, Utils.mkSet((Object[])new KeyValue[]{new KeyValue((Object)"k1", (Object)1L), new KeyValue((Object)"k2", (Object)2L), new KeyValue((Object)"k3", (Object)3L), new KeyValue((Object)"k4", (Object)4L), new KeyValue((Object)"k5", (Object)5L)}), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        Properties props = (Properties)this.streamsConfiguration.clone();
        File stateDir = TestUtils.tempDirectory((String)(this.stateStoreName + "0"));
        props.put("application.server", "localhost:0");
        props.put("client.id", "instance-0");
        props.put("state.dir", stateDir.getAbsolutePath());
        StreamsBuilder builder = new StreamsBuilder();
        KTable t1 = builder.table(this.inputTopicName, Materialized.as((String)this.stateStoreName));
        t1.toStream().to(this.outputTopicName);
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        try {
            TestUtils.waitForCondition(() -> streams.allLocalStorePartitionLags().size() == 0, (long)120000L, (String)"Should see empty lag map before streams is started.");
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
            IntegrationTestUtils.waitUntilMinValuesRecordsReceived(this.consumerConfiguration, this.outputTopicName, 5, 120000L);
            AtomicReference zeroLagRef = new AtomicReference();
            TestUtils.waitForCondition(() -> {
                Map offsetLagInfoMap = streams.allLocalStorePartitionLags();
                MatcherAssert.assertThat((Object)offsetLagInfoMap.size(), (Matcher)IsEqual.equalTo((Object)1));
                MatcherAssert.assertThat(offsetLagInfoMap.keySet(), (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new String[]{this.stateStoreName})));
                MatcherAssert.assertThat((Object)((Map)offsetLagInfoMap.get(this.stateStoreName)).size(), (Matcher)IsEqual.equalTo((Object)1));
                LagInfo zeroLagInfo = (LagInfo)((Map)offsetLagInfoMap.get(this.stateStoreName)).get(0);
                MatcherAssert.assertThat((Object)zeroLagInfo.currentOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
                MatcherAssert.assertThat((Object)zeroLagInfo.endOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
                MatcherAssert.assertThat((Object)zeroLagInfo.offsetLag(), (Matcher)IsEqual.equalTo((Object)0L));
                zeroLagRef.set(zeroLagInfo);
                return true;
            }, (long)120000L, (String)"Eventually should reach zero lag.");
            MatcherAssert.assertThat((String)"Streams instance did not close within timeout", (boolean)streams.close(Duration.ofSeconds(60L)));
            IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
            Files.walk(stateDir.toPath(), new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(f -> Assertions.assertTrue((boolean)f.delete(), (String)("Some state " + f + " could not be deleted")));
            final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props);
            final CountDownLatch restorationEndLatch = new CountDownLatch(1);
            final HashMap restoreStartLagInfo = new HashMap();
            final HashMap restoreEndLagInfo = new HashMap();
            restartedStreams.setGlobalStateRestoreListener(new StateRestoreListener(){

                public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
                    try {
                        restoreStartLagInfo.putAll(LagFetchIntegrationTest.this.getFirstNonEmptyLagMap(restartedStreams));
                    }
                    catch (Exception e) {
                        LOG.error("Exception while trying to obtain lag map", (Throwable)e);
                    }
                }

                public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
                }

                public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
                    try {
                        restoreEndLagInfo.putAll(LagFetchIntegrationTest.this.getFirstNonEmptyLagMap(restartedStreams));
                    }
                    catch (Exception e) {
                        LOG.error("Exception while trying to obtain lag map", (Throwable)e);
                    }
                    restorationEndLatch.countDown();
                }
            });
            restartedStreams.start();
            restorationEndLatch.await(120000L, TimeUnit.MILLISECONDS);
            TestUtils.waitForCondition(() -> ((LagInfo)((Map)restartedStreams.allLocalStorePartitionLags().get(this.stateStoreName)).get(0)).offsetLag() == 0L, (long)120000L, (String)"Standby should eventually catchup and have zero lag.");
            LagInfo fullLagInfo = (LagInfo)((Map)restoreStartLagInfo.get(this.stateStoreName)).get(0);
            MatcherAssert.assertThat((Object)fullLagInfo.currentOffsetPosition(), (Matcher)IsEqual.equalTo((Object)0L));
            MatcherAssert.assertThat((Object)fullLagInfo.endOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)fullLagInfo.offsetLag(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat(((Map)restoreEndLagInfo.get(this.stateStoreName)).get(0), (Matcher)IsEqual.equalTo(zeroLagRef.get()));
        }
        finally {
            streams.close();
            streams.cleanUp();
        }
    }
}

