/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class PurgeRepartitionTopicIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_TOPIC = "input-stream";
    private static final String APPLICATION_ID = "restore-test";
    private static final String REPARTITION_TOPIC = "restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition";
    private static Admin adminClient;
    private static KafkaStreams kafkaStreams;
    private static final Integer PURGE_INTERVAL_MS;
    private static final Integer PURGE_SEGMENT_BYTES;
    private static final Integer INITIAL_TASK_DELAY_MS;
    public static final EmbeddedKafkaCluster CLUSTER;
    private final Time time = new MockTime(1L);

    @BeforeAll
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void setup() {
        Properties adminConfig = new Properties();
        adminConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        adminClient = Admin.create((Properties)adminConfig);
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", APPLICATION_ID);
        streamsConfiguration.put("commit.interval.ms", PURGE_INTERVAL_MS);
        streamsConfiguration.put("repartition.purge.interval.ms", PURGE_INTERVAL_MS);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory((String)APPLICATION_ID).getPath());
        streamsConfiguration.put(StreamsConfig.topicPrefix((String)"segment.ms"), PURGE_INTERVAL_MS);
        streamsConfiguration.put(StreamsConfig.topicPrefix((String)"segment.bytes"), PURGE_SEGMENT_BYTES);
        streamsConfiguration.put(StreamsConfig.producerPrefix((String)"batch.size"), (Object)(PURGE_SEGMENT_BYTES / 2));
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(INPUT_TOPIC).groupBy(MockMapper.selectKeyKeyValueMapper()).count();
        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration, this.time);
    }

    @AfterEach
    public void shutdown() {
        if (adminClient != null) {
            adminClient.close();
        }
        if (kafkaStreams != null) {
            kafkaStreams.close(Duration.ofSeconds(30L));
        }
    }

    @Test
    public void shouldRestoreState() throws Exception {
        ArrayList messages = new ArrayList();
        for (int i = 0; i < 1000; ++i) {
            messages.add(new KeyValue((Object)i, (Object)i));
        }
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, messages, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), this.time.milliseconds());
        kafkaStreams.start();
        TestUtils.waitForCondition((TestCondition)new RepartitionTopicCreatedWithExpectedConfigs(), (long)60000L, (String)"Repartition topic restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not created with the expected configs after 60000 ms.");
        TestUtils.waitForCondition((TestCondition)new RepartitionTopicVerified(currentSize -> currentSize > (long)PURGE_SEGMENT_BYTES.intValue()), (long)60000L, (String)("Repartition topic restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not received more than " + PURGE_SEGMENT_BYTES + "B of data after 60000 ms."));
        long waitForPurgeMs = 60000L;
        TestUtils.waitForCondition((TestCondition)new RepartitionTopicVerified(currentSize -> currentSize <= (long)PURGE_SEGMENT_BYTES.intValue()), (long)60000L, (String)"Repartition topic restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not purged data after 60000 ms.");
    }

    static {
        PURGE_INTERVAL_MS = 10;
        PURGE_SEGMENT_BYTES = 2000;
        INITIAL_TASK_DELAY_MS = 0;
        CLUSTER = new EmbeddedKafkaCluster(1, new Properties(){
            {
                this.put("log.retention.check.interval.ms", PURGE_INTERVAL_MS);
                this.put("log.initial.task.delay.ms", INITIAL_TASK_DELAY_MS);
                this.put("file.delete.delay.ms", (Object)0);
            }
        });
    }

    private class RepartitionTopicVerified
    implements TestCondition {
        private final TopicSizeVerifier verifier;

        RepartitionTopicVerified(TopicSizeVerifier verifier) {
            this.verifier = verifier;
        }

        public final boolean conditionMet() {
            PurgeRepartitionTopicIntegrationTest.this.time.sleep((long)PURGE_INTERVAL_MS.intValue());
            try {
                Collection logDirInfo = ((Map)((KafkaFuture)adminClient.describeLogDirs(Collections.singleton(0)).descriptions().get(0)).get()).values();
                for (LogDirDescription partitionInfo : logDirInfo) {
                    ReplicaInfo replicaInfo = (ReplicaInfo)partitionInfo.replicaInfos().get(new TopicPartition(PurgeRepartitionTopicIntegrationTest.REPARTITION_TOPIC, 0));
                    if (replicaInfo == null || !this.verifier.verify(replicaInfo.size())) continue;
                    return true;
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            return false;
        }
    }

    private static interface TopicSizeVerifier {
        public boolean verify(long var1);
    }

    private class RepartitionTopicCreatedWithExpectedConfigs
    implements TestCondition {
        private RepartitionTopicCreatedWithExpectedConfigs() {
        }

        public final boolean conditionMet() {
            try {
                Set topics = (Set)adminClient.listTopics().names().get();
                if (!topics.contains(PurgeRepartitionTopicIntegrationTest.REPARTITION_TOPIC)) {
                    return false;
                }
            }
            catch (Exception e) {
                return false;
            }
            try {
                ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, PurgeRepartitionTopicIntegrationTest.REPARTITION_TOPIC);
                Config config = (Config)((KafkaFuture)adminClient.describeConfigs(Collections.singleton(resource)).values().get(resource)).get();
                return config.get("cleanup.policy").value().equals("delete") && config.get("segment.ms").value().equals(PURGE_INTERVAL_MS.toString()) && config.get("segment.bytes").value().equals(PURGE_SEGMENT_BYTES.toString());
            }
            catch (Exception e) {
                return false;
            }
        }
    }
}

