/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="integration")
@Timeout(value=600L)
public class JoinGracePeriodDurabilityIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[0])), 0L);
    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final long COMMIT_INTERVAL = 100L;

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(strings={"at_least_once", "exactly_once", "exactly_once_v2"})
    public void shouldRecoverBufferAfterShutdown(String processingGuarantee, TestInfo testInfo) {
        String testId = IntegrationTestUtils.safeUniqueTestName(testInfo);
        String appId = "appId_" + testId;
        String streamInput = "Streaminput" + testId;
        String tableInput = "Tableinput" + testId;
        String storeName = "grace";
        String output = "output" + testId;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, streamInput, tableInput, output);
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(streamInput, Consumed.with(STRING_SERDE, STRING_SERDE));
        KTable table = builder.table(tableInput, Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"grace", (Duration)Duration.ofMillis(1000L))));
        KStream joinedStream = stream.join(table, MockValueJoiner.TOSTRING_JOINER, Joined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String(), (String)"Grace", (Duration)Duration.ofMillis(5L)));
        AtomicInteger eventCount = new AtomicInteger(0);
        joinedStream.foreach((key, value) -> eventCount.incrementAndGet());
        joinedStream.to(output);
        Properties streamsConfig = Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"poll.ms", (Object)Long.toString(100L)), Utils.mkEntry((Object)"processing.guarantee", (Object)processingGuarantee), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"default.key.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)"default.value.serde", Serdes.StringSerde.class)}));
        streamsConfig.put("commit.interval.ms", (Object)100L);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            JoinGracePeriodDurabilityIntegrationTest.produceSynchronouslyToPartitionZero(tableInput, Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k2", "v2", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k3", "v3", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k4", "v4", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k5", "v5", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k6", "v6", this.scaledTime(0L))));
            JoinGracePeriodDurabilityIntegrationTest.produceSynchronouslyToPartitionZero(streamInput, Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v2", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("k3", "v3", this.scaledTime(7L))));
            this.verifyOutput(output, Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1+v1", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v2+v2", this.scaledTime(2L))));
            MatcherAssert.assertThat((Object)eventCount.get(), (Matcher)CoreMatchers.is((Object)2));
            JoinGracePeriodDurabilityIntegrationTest.produceSynchronouslyToPartitionZero(streamInput, Arrays.asList(new KeyValueTimestamp<String, String>("k4", "v4", this.scaledTime(4L)), new KeyValueTimestamp<String, String>("k5", "v5", this.scaledTime(5L))));
            driver.close();
            MatcherAssert.assertThat((Object)driver.state(), (Matcher)CoreMatchers.is((Object)KafkaStreams.State.NOT_RUNNING));
            driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, false);
            JoinGracePeriodDurabilityIntegrationTest.produceSynchronouslyToPartitionZero(streamInput, Arrays.asList(new KeyValueTimestamp<String, String>("k6", "v6", this.scaledTime(20L))));
            this.verifyOutput(output, Arrays.asList(new KeyValueTimestamp<String, String>("k4", "v4+v4", this.scaledTime(4L)), new KeyValueTimestamp<String, String>("k5", "v5+v5", this.scaledTime(5L)), new KeyValueTimestamp<String, String>("k3", "v3+v3", this.scaledTime(7L))));
            MatcherAssert.assertThat((String)"There should only be 5 output events.", (Object)eventCount.get(), (Matcher)CoreMatchers.is((Object)5));
        }
        finally {
            driver.close();
            IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, driver);
        }
    }

    private void verifyOutput(String topic, List<KeyValueTimestamp<String, String>> keyValueTimestamps) {
        Properties properties = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"group.id", (Object)"test-group"), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"key.deserializer", (Object)STRING_DESERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.deserializer", (Object)STRING_DESERIALIZER.getClass().getName())}));
        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
    }

    private long scaledTime(long unscaledTime) {
        return 200L * unscaledTime;
    }

    private static void produceSynchronouslyToPartitionZero(String topic, List<KeyValueTimestamp<String, String>> toProduce) {
        Properties producerConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client.id", (Object)"anything"), Utils.mkEntry((Object)"key.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
        IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.of(0), toProduce);
    }
}

