/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Tag(value="integration")
@Timeout(value=600L)
public class KafkaStreamsCloseOptionsIntegrationTest {
    private static MockTime mockTime;
    protected static final String INPUT_TOPIC = "inputTopic";
    protected static final String OUTPUT_TOPIC = "outputTopic";
    protected Properties streamsConfig;
    protected static KafkaStreams streams;
    protected static Admin adminClient;
    protected Properties commonClientConfig;
    private Properties producerConfig;
    protected Properties resultConsumerConfig;
    private final File testFolder = TestUtils.tempDirectory();
    public static final EmbeddedKafkaCluster CLUSTER;

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        Utils.closeQuietly((AutoCloseable)adminClient, (String)"admin");
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testName) throws Exception {
        mockTime = KafkaStreamsCloseOptionsIntegrationTest.CLUSTER.time;
        String appID = IntegrationTestUtils.safeUniqueTestName(testName);
        this.commonClientConfig = new Properties();
        this.commonClientConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfig = new Properties();
        this.streamsConfig.put("application.id", appID);
        this.streamsConfig.put("group.instance.id", "someGroupInstance");
        this.streamsConfig.put("state.dir", this.testFolder.getPath());
        this.streamsConfig.put("default.key.serde", Serdes.Long().getClass());
        this.streamsConfig.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfig.put("statestore.cache.max.bytes", (Object)0);
        this.streamsConfig.put("commit.interval.ms", (Object)100L);
        this.streamsConfig.put("heartbeat.interval.ms", (Object)100);
        this.streamsConfig.put("auto.offset.reset", "earliest");
        this.streamsConfig.put("session.timeout.ms", (Object)Integer.MAX_VALUE);
        this.streamsConfig.putAll((Map<?, ?>)this.commonClientConfig);
        this.producerConfig = new Properties();
        this.producerConfig.put("acks", "all");
        this.producerConfig.put("key.serializer", LongSerializer.class);
        this.producerConfig.put("value.serializer", StringSerializer.class);
        this.producerConfig.putAll((Map<?, ?>)this.commonClientConfig);
        this.resultConsumerConfig = new Properties();
        this.resultConsumerConfig.put("group.id", appID + "-result-consumer");
        this.resultConsumerConfig.put("auto.offset.reset", "earliest");
        this.resultConsumerConfig.put("key.deserializer", LongDeserializer.class);
        this.resultConsumerConfig.put("value.deserializer", StringDeserializer.class);
        this.resultConsumerConfig.putAll((Map<?, ?>)this.commonClientConfig);
        if (adminClient == null) {
            adminClient = Admin.create((Properties)this.commonClientConfig);
        }
        CLUSTER.deleteAllTopicsAndWait(120000L);
        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
        this.add10InputElements();
    }

    @AfterEach
    public void after() throws Exception {
        if (streams != null) {
            streams.close(Duration.ofSeconds(30L));
        }
        Utils.delete((File)this.testFolder);
    }

    @Test
    public void testCloseOptions() throws Exception {
        this.streamsConfig.put("num.stream.threads", (Object)2);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close(new KafkaStreams.CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30L)));
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, this.streamsConfig.getProperty("application.id"), 0L);
    }

    protected Topology setupTopologyWithoutIntermediateUserTopic() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(INPUT_TOPIC);
        input.to(OUTPUT_TOPIC, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.String()));
        return builder.build();
    }

    private void add10InputElements() {
        List<KeyValue> records = Arrays.asList(KeyValue.pair((Object)0L, (Object)"aaa"), KeyValue.pair((Object)1L, (Object)"bbb"), KeyValue.pair((Object)0L, (Object)"ccc"), KeyValue.pair((Object)1L, (Object)"ddd"), KeyValue.pair((Object)0L, (Object)"eee"), KeyValue.pair((Object)1L, (Object)"fff"), KeyValue.pair((Object)0L, (Object)"ggg"), KeyValue.pair((Object)1L, (Object)"hhh"), KeyValue.pair((Object)0L, (Object)"iii"), KeyValue.pair((Object)1L, (Object)"jjj"));
        for (KeyValue record : records) {
            mockTime.sleep(10L);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), this.producerConfig, mockTime.milliseconds());
        }
    }

    static {
        Properties brokerProps = new Properties();
        brokerProps.setProperty("group.max.session.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
    }
}

