/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="integration")
@Timeout(value=600L)
public class ResetPartitionTimeIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final Properties BROKER_CONFIG = new Properties();
    public static final EmbeddedKafkaCluster CLUSTER;
    private static final StringDeserializer STRING_DESERIALIZER;
    private static final StringSerializer STRING_SERIALIZER;
    private static final Serde<String> STRING_SERDE;
    private static final int DEFAULT_TIMEOUT = 100;
    private static long lastRecordedTimestamp;

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(strings={"at_least_once", "exactly_once", "exactly_once_v2"})
    public void shouldPreservePartitionTimeOnKafkaStreamRestart(String processingGuarantee, TestInfo testInfo) {
        String appId = "app-" + IntegrationTestUtils.safeUniqueTestName(testInfo);
        String input = "input";
        String outputRaw = "output-raw";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, "input", "output-raw");
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).to("output-raw");
        Properties streamsConfig = new Properties();
        streamsConfig.put("default.timestamp.extractor", MaxTimestampExtractor.class);
        streamsConfig.put("application.id", appId);
        streamsConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfig.put("poll.ms", Integer.toString(100));
        streamsConfig.put("commit.interval.ms", (Object)100L);
        streamsConfig.put("processing.guarantee", processingGuarantee);
        streamsConfig.put("state.dir", TestUtils.tempDirectory().getPath());
        KafkaStreams kafkaStreams = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            ResetPartitionTimeIntegrationTest.produceSynchronouslyToPartitionZero("input", Collections.singletonList(new KeyValueTimestamp<String, String>("k3", "v3", 5000L)));
            this.verifyOutput("output-raw", Collections.singletonList(new KeyValueTimestamp<String, String>("k3", "v3", 5000L)));
            MatcherAssert.assertThat((Object)lastRecordedTimestamp, (Matcher)CoreMatchers.is((Object)-1L));
            lastRecordedTimestamp = -2L;
            kafkaStreams.close();
            MatcherAssert.assertThat((Object)kafkaStreams.state(), (Matcher)CoreMatchers.is((Object)KafkaStreams.State.NOT_RUNNING));
            kafkaStreams = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
            ResetPartitionTimeIntegrationTest.produceSynchronouslyToPartitionZero("input", Collections.singletonList(new KeyValueTimestamp<String, String>("k5", "v5", 4999L)));
            this.verifyOutput("output-raw", Collections.singletonList(new KeyValueTimestamp<String, String>("k5", "v5", 4999L)));
            MatcherAssert.assertThat((Object)lastRecordedTimestamp, (Matcher)CoreMatchers.is((Object)5000L));
        }
        finally {
            kafkaStreams.close();
            IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);
        }
    }

    private void verifyOutput(String topic, List<KeyValueTimestamp<String, String>> keyValueTimestamps) {
        Properties properties = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"group.id", (Object)"test-group"), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"key.deserializer", (Object)STRING_DESERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.deserializer", (Object)STRING_DESERIALIZER.getClass().getName())}));
        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
    }

    private static void produceSynchronouslyToPartitionZero(String topic, List<KeyValueTimestamp<String, String>> toProduce) {
        Properties producerConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client.id", (Object)"anything"), Utils.mkEntry((Object)"key.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
        IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.of(0), toProduce);
    }

    static {
        BROKER_CONFIG.put("transaction.state.log.replication.factor", (Object)1);
        BROKER_CONFIG.put("transaction.state.log.min.isr", (Object)1);
        CLUSTER = new EmbeddedKafkaCluster(1, BROKER_CONFIG, 0L);
        STRING_DESERIALIZER = new StringDeserializer();
        STRING_SERIALIZER = new StringSerializer();
        STRING_SERDE = Serdes.String();
        lastRecordedTimestamp = -2L;
    }

    public static final class MaxTimestampExtractor
    implements TimestampExtractor {
        public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
            lastRecordedTimestamp = partitionTime;
            return record.timestamp();
        }
    }
}

