/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.log.LogConfig;
import kafka.utils.MockTime;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import scala.collection.JavaConversions;

@Category(value={IntegrationTest.class})
public class InternalTopicIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String APP_ID = "internal-topics-integration-test";
    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;
    private final MockTime mockTime;
    private Properties streamsProp;

    public InternalTopicIntegrationTest() {
        this.mockTime = InternalTopicIntegrationTest.CLUSTER.time;
    }

    @BeforeClass
    public static void startKafkaCluster() throws InterruptedException {
        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC);
    }

    @Before
    public void before() {
        this.streamsProp = new Properties();
        this.streamsProp.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsProp.put("default.key.serde", Serdes.String().getClass().getName());
        this.streamsProp.put("default.value.serde", Serdes.String().getClass().getName());
        this.streamsProp.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsProp.put("auto.offset.reset", "earliest");
        this.streamsProp.put("internal.leave.group.on.close", (Object)true);
        this.streamsProp.put("commit.interval.ms", (Object)100);
        this.streamsProp.put("cache.max.bytes.buffering", (Object)0);
    }

    @After
    public void after() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsProp);
    }

    private void produceData(List<String> inputValues) throws Exception {
        Properties producerProp = new Properties();
        producerProp.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerProp.put("acks", "all");
        producerProp.put("retries", (Object)0);
        producerProp.put("key.serializer", StringSerializer.class);
        producerProp.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerProp, (Time)this.mockTime);
    }

    private Properties getTopicProperties(String changelog) {
        try (KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false, 10000, 8000, Integer.MAX_VALUE, Time.SYSTEM, "testMetricGroup", "testMetricType");){
            AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
            Map<String, Properties> topicConfigs = JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
            for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) {
                if (!topicConfig.getKey().equals(changelog)) continue;
                Properties properties = topicConfig.getValue();
                return properties;
            }
            Properties properties = new Properties();
            return properties;
        }
    }

    @Test
    public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
        String appID = "internal-topics-integration-test-compact";
        this.streamsProp.put("application.id", "internal-topics-integration-test-compact");
        StreamsBuilder builder = new StreamsBuilder();
        KStream textLines = builder.stream(DEFAULT_INPUT_TOPIC);
        textLines.flatMapValues(new ValueMapper<String, Iterable<String>>(){

            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        }).groupBy(MockMapper.selectValueMapper()).count(Materialized.as("Counts"));
        KafkaStreams streams = new KafkaStreams(builder.build(), this.streamsProp);
        streams.start();
        this.produceData(Arrays.asList("hello", "world", "world", "hello world"));
        streams.close();
        Properties changelogProps = this.getTopicProperties(ProcessorStateManager.storeChangelogTopic("internal-topics-integration-test-compact", "Counts"));
        Assert.assertEquals((Object)LogConfig.Compact(), (Object)changelogProps.getProperty(LogConfig.CleanupPolicyProp()));
        Properties repartitionProps = this.getTopicProperties("internal-topics-integration-test-compact-Counts-repartition");
        Assert.assertEquals((Object)LogConfig.Delete(), (Object)repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
        Assert.assertEquals((long)4L, (long)repartitionProps.size());
    }

    @Test
    public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Exception {
        String appID = "internal-topics-integration-test-compact-delete";
        this.streamsProp.put("application.id", "internal-topics-integration-test-compact-delete");
        StreamsBuilder builder = new StreamsBuilder();
        KStream textLines = builder.stream(DEFAULT_INPUT_TOPIC);
        int durationMs = 2000;
        textLines.flatMapValues(new ValueMapper<String, Iterable<String>>(){

            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        }).groupBy(MockMapper.selectValueMapper()).windowedBy(TimeWindows.of(1000L).until(2000L)).count(Materialized.as("CountWindows"));
        KafkaStreams streams = new KafkaStreams(builder.build(), this.streamsProp);
        streams.start();
        this.produceData(Arrays.asList("hello", "world", "world", "hello world"));
        streams.close();
        Properties properties = this.getTopicProperties(ProcessorStateManager.storeChangelogTopic("internal-topics-integration-test-compact-delete", "CountWindows"));
        List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
        Assert.assertEquals((long)2L, (long)policies.size());
        Assert.assertTrue((boolean)policies.contains(LogConfig.Compact()));
        Assert.assertTrue((boolean)policies.contains(LogConfig.Delete()));
        long retention = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS) + 2000L;
        Assert.assertEquals((long)retention, (long)Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp())));
        Properties repartitionProps = this.getTopicProperties("internal-topics-integration-test-compact-delete-CountWindows-repartition");
        Assert.assertEquals((Object)LogConfig.Delete(), (Object)repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
        Assert.assertEquals((long)4L, (long)repartitionProps.size());
    }
}

