/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.StoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(value="integration")
@Timeout(value=600L)
public class PositionRestartIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(PositionRestartIntegrationTest.class);
    private static final long SEED = new Random().nextLong();
    private static final int NUM_BROKERS = 1;
    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5L);
    private static int port = 0;
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final Position INPUT_POSITION = Position.emptyPosition();
    private static final String STORE_NAME = "kv-store";
    private static final long RECORD_TIME = System.currentTimeMillis();
    private static final long WINDOW_START = RECORD_TIME / WINDOW_SIZE.toMillis() * WINDOW_SIZE.toMillis();
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private KafkaStreams kafkaStreams;

    public static Stream<Arguments> data() {
        LOG.info("Generating test cases according to random seed: {}", (Object)SEED);
        ArrayList<Arguments> values = new ArrayList<Arguments>();
        for (boolean cacheEnabled : Arrays.asList(true, false)) {
            for (boolean logEnabled : Arrays.asList(true, false)) {
                for (StoresToTest toTest : StoresToTest.values()) {
                    if (!logEnabled && !toTest.supplier().get().persistent()) continue;
                    for (String kind : Arrays.asList("DSL", "PAPI")) {
                        values.add(Arguments.of((Object[])new Object[]{cacheEnabled, logEnabled, toTest, kind}));
                    }
                }
            }
        }
        return values.stream();
    }

    @BeforeAll
    public static void before() throws InterruptedException, IOException, ExecutionException, TimeoutException {
        CLUSTER.start();
        CLUSTER.deleteAllTopicsAndWait(60000L);
        int partitions = 2;
        CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerProps.put("key.serializer", IntegerSerializer.class);
        producerProps.put("value.serializer", IntegerSerializer.class);
        LinkedList<Future> futures = new LinkedList<Future>();
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            for (int i = 0; i < 4; ++i) {
                Future send = producer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(i % 2), Long.valueOf(RECORD_TIME), (Object)i, (Object)i, null));
                futures.add(send);
                Time.SYSTEM.sleep(1L);
            }
            producer.flush();
            for (Future future : futures) {
                RecordMetadata recordMetadata = (RecordMetadata)future.get(1L, TimeUnit.MINUTES);
                MatcherAssert.assertThat((Object)recordMetadata.hasOffset(), (Matcher)Matchers.is((Object)true));
                INPUT_POSITION.withComponent(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
            }
        }
        MatcherAssert.assertThat((Object)INPUT_POSITION, (Matcher)Matchers.equalTo((Object)Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 1L).withComponent(INPUT_TOPIC_NAME, 1, 1L)));
    }

    public static StreamsBuilder getStreamBuilder(boolean cache, boolean log, StoresToTest storeToTest, String kind) {
        StoreSupplier<?> supplier = storeToTest.supplier();
        StreamsBuilder builder = new StreamsBuilder();
        if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
            PositionRestartIntegrationTest.setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier)supplier, builder, cache, log);
        } else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) {
            PositionRestartIntegrationTest.setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier)supplier, builder, cache, log, storeToTest);
        } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
            PositionRestartIntegrationTest.setUpWindowDSLTopology((WindowBytesStoreSupplier)supplier, builder, cache, log);
        } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
            PositionRestartIntegrationTest.setUpWindowPAPITopology((WindowBytesStoreSupplier)supplier, builder, cache, log, storeToTest);
        } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
            PositionRestartIntegrationTest.setUpSessionDSLTopology((SessionBytesStoreSupplier)supplier, builder, cache, log);
        } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
            PositionRestartIntegrationTest.setUpSessionPAPITopology((SessionBytesStoreSupplier)supplier, builder, cache, log);
        } else {
            throw new AssertionError((Object)"Store supplier is an unrecognized type.");
        }
        return builder;
    }

    @AfterEach
    public void afterTest() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
            this.kafkaStreams.cleanUp();
        }
    }

    @AfterAll
    public static void after() {
        CLUSTER.stop();
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void verifyStore(boolean cache, boolean log, StoresToTest storeToTest, String kind) {
        RangeQuery query;
        Properties streamsConfig = PositionRestartIntegrationTest.streamsConfiguration(cache, log, storeToTest.name(), kind);
        StreamsBuilder streamsBuilder = PositionRestartIntegrationTest.getStreamBuilder(cache, log, storeToTest, kind);
        this.kafkaStreams = IntegrationTestUtils.getStartedStreams(streamsConfig, streamsBuilder, true);
        if (storeToTest.keyValue()) {
            query = RangeQuery.withNoBounds();
        } else if (storeToTest.isWindowed()) {
            query = WindowKeyQuery.withKeyAndWindowStartRange((Object)2, (Instant)Instant.ofEpochMilli(WINDOW_START), (Instant)Instant.ofEpochMilli(WINDOW_START));
        } else if (storeToTest.isSession()) {
            query = WindowRangeQuery.withKey((Object)2);
        } else {
            throw new AssertionError((Object)("Unhandled store type: " + (Object)((Object)storeToTest)));
        }
        this.shouldReachExpectedPosition((Query<?>)query);
        this.kafkaStreams.close();
        this.kafkaStreams = IntegrationTestUtils.getStartedStreams(streamsConfig, streamsBuilder, false);
        this.shouldReachExpectedPosition((Query<?>)query);
    }

    private void shouldReachExpectedPosition(Query<?> query) {
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery(query).withPartitions(Utils.mkSet((Object[])new Integer[]{0, 1})).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        MatcherAssert.assertThat((Object)result.getPosition(), (Matcher)Matchers.is((Object)INPUT_POSITION));
    }

    private static void setUpSessionDSLTopology(SessionBytesStoreSupplier supplier, StreamsBuilder builder, boolean cache, boolean log) {
        Materialized materialized = Materialized.as((SessionBytesStoreSupplier)supplier);
        if (cache) {
            materialized.withCachingEnabled();
        } else {
            materialized.withCachingDisabled();
        }
        if (log) {
            materialized.withLoggingEnabled(Collections.emptyMap());
        } else {
            materialized.withLoggingDisabled();
        }
        builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).groupByKey().windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)WINDOW_SIZE)).aggregate(() -> 0, (key, value, aggregate) -> aggregate + value, (aggKey, aggOne, aggTwo) -> aggOne + aggTwo, materialized);
    }

    private static void setUpWindowDSLTopology(WindowBytesStoreSupplier supplier, StreamsBuilder builder, boolean cache, boolean log) {
        Materialized materialized = Materialized.as((WindowBytesStoreSupplier)supplier);
        if (cache) {
            materialized.withCachingEnabled();
        } else {
            materialized.withCachingDisabled();
        }
        if (log) {
            materialized.withLoggingEnabled(Collections.emptyMap());
        } else {
            materialized.withLoggingDisabled();
        }
        builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).groupByKey().windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)WINDOW_SIZE)).aggregate(() -> 0, (key, value, aggregate) -> aggregate + value, materialized);
    }

    private static void setUpKeyValueDSLTopology(KeyValueBytesStoreSupplier supplier, StreamsBuilder builder, boolean cache, boolean log) {
        Materialized materialized = Materialized.as((KeyValueBytesStoreSupplier)supplier);
        if (cache) {
            materialized.withCachingEnabled();
        } else {
            materialized.withCachingDisabled();
        }
        if (log) {
            materialized.withLoggingEnabled(Collections.emptyMap());
        } else {
            materialized.withLoggingDisabled();
        }
        builder.table(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), materialized);
    }

    private static void setUpKeyValuePAPITopology(KeyValueBytesStoreSupplier supplier, StreamsBuilder builder, boolean cache, boolean log, StoresToTest storeToTest) {
        ProcessorSupplier processorSupplier;
        StoreBuilder keyValueStoreStoreBuilder;
        if (storeToTest.timestamped()) {
            keyValueStoreStoreBuilder = Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

                public void process(Record<Integer, Integer> record) {
                    TimestampedKeyValueStore stateStore = (TimestampedKeyValueStore)this.context().getStateStore(keyValueStoreStoreBuilder.name());
                    stateStore.put(record.key(), (Object)ValueAndTimestamp.make((Object)record.value(), (long)record.timestamp()));
                }
            };
        } else {
            keyValueStoreStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

                public void process(Record<Integer, Integer> record) {
                    KeyValueStore stateStore = (KeyValueStore)this.context().getStateStore(keyValueStoreStoreBuilder.name());
                    stateStore.put(record.key(), record.value());
                }
            };
        }
        if (cache) {
            keyValueStoreStoreBuilder.withCachingEnabled();
        } else {
            keyValueStoreStoreBuilder.withCachingDisabled();
        }
        if (log) {
            keyValueStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            keyValueStoreStoreBuilder.withLoggingDisabled();
        }
        builder.addStateStore(keyValueStoreStoreBuilder);
        builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).process(processorSupplier, new String[]{keyValueStoreStoreBuilder.name()});
    }

    private static void setUpWindowPAPITopology(WindowBytesStoreSupplier supplier, StreamsBuilder builder, boolean cache, boolean log, StoresToTest storeToTest) {
        ProcessorSupplier processorSupplier;
        StoreBuilder windowStoreStoreBuilder;
        if (storeToTest.timestamped()) {
            windowStoreStoreBuilder = Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

                public void process(Record<Integer, Integer> record) {
                    TimestampedWindowStore stateStore = (TimestampedWindowStore)this.context().getStateStore(windowStoreStoreBuilder.name());
                    stateStore.put(record.key(), (Object)ValueAndTimestamp.make((Object)record.value(), (long)record.timestamp()), WINDOW_START);
                }
            };
        } else {
            windowStoreStoreBuilder = Stores.windowStoreBuilder((WindowBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

                public void process(Record<Integer, Integer> record) {
                    WindowStore stateStore = (WindowStore)this.context().getStateStore(windowStoreStoreBuilder.name());
                    stateStore.put(record.key(), record.value(), WINDOW_START);
                }
            };
        }
        if (cache) {
            windowStoreStoreBuilder.withCachingEnabled();
        } else {
            windowStoreStoreBuilder.withCachingDisabled();
        }
        if (log) {
            windowStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            windowStoreStoreBuilder.withLoggingDisabled();
        }
        builder.addStateStore(windowStoreStoreBuilder);
        builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).process(processorSupplier, new String[]{windowStoreStoreBuilder.name()});
    }

    private static void setUpSessionPAPITopology(SessionBytesStoreSupplier supplier, StreamsBuilder builder, boolean cache, boolean log) {
        final StoreBuilder sessionStoreStoreBuilder = Stores.sessionStoreBuilder((SessionBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        ProcessorSupplier processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

            public void process(Record<Integer, Integer> record) {
                SessionStore stateStore = (SessionStore)this.context().getStateStore(sessionStoreStoreBuilder.name());
                stateStore.put(new Windowed(record.key(), (Window)new SessionWindow(WINDOW_START, WINDOW_START)), record.value());
            }
        };
        if (cache) {
            sessionStoreStoreBuilder.withCachingEnabled();
        } else {
            sessionStoreStoreBuilder.withCachingDisabled();
        }
        if (log) {
            sessionStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            sessionStoreStoreBuilder.withLoggingDisabled();
        }
        builder.addStateStore(sessionStoreStoreBuilder);
        builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).process(processorSupplier, new String[]{sessionStoreStoreBuilder.name()});
    }

    private static Properties streamsConfiguration(boolean cache, boolean log, String supplier, String kind) {
        String safeTestName = PositionRestartIntegrationTest.class.getName() + "-" + cache + "-" + log + "-" + supplier + "-" + kind;
        Properties config = new Properties();
        config.put("topology.optimization", "all");
        config.put("application.id", "app-" + safeTestName);
        config.put("application.server", "localhost:" + ++port);
        config.put("bootstrap.servers", CLUSTER.bootstrapServers());
        config.put("state.dir", TestUtils.tempDirectory().getPath());
        config.put("default.key.serde", Serdes.Integer().getClass());
        config.put("default.value.serde", Serdes.Integer().getClass());
        config.put("num.standby.replicas", (Object)1);
        config.put("max.poll.records", (Object)100);
        config.put("heartbeat.interval.ms", (Object)200);
        config.put("session.timeout.ms", (Object)1000);
        config.put("commit.interval.ms", (Object)100L);
        config.put("num.stream.threads", (Object)1);
        config.put("__iq.consistency.offset.vector.enabled__", (Object)true);
        return config;
    }

    public static enum StoresToTest {
        IN_MEMORY_KV{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryKeyValueStore((String)PositionRestartIntegrationTest.STORE_NAME);
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        IN_MEMORY_LRU{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.lruMap((String)PositionRestartIntegrationTest.STORE_NAME, (int)100);
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        ROCKS_KV{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentKeyValueStore((String)PositionRestartIntegrationTest.STORE_NAME);
            }

            @Override
            public boolean timestamped() {
                return false;
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        TIME_ROCKS_KV{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedKeyValueStore((String)PositionRestartIntegrationTest.STORE_NAME);
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        IN_MEMORY_WINDOW{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryWindowStore((String)PositionRestartIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L), (Duration)WINDOW_SIZE, (boolean)false);
            }

            @Override
            public boolean isWindowed() {
                return true;
            }
        }
        ,
        ROCKS_WINDOW{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentWindowStore((String)PositionRestartIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L), (Duration)WINDOW_SIZE, (boolean)false);
            }

            @Override
            public boolean isWindowed() {
                return true;
            }

            @Override
            public boolean timestamped() {
                return false;
            }
        }
        ,
        TIME_ROCKS_WINDOW{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedWindowStore((String)PositionRestartIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L), (Duration)WINDOW_SIZE, (boolean)false);
            }

            @Override
            public boolean isWindowed() {
                return true;
            }
        }
        ,
        IN_MEMORY_SESSION{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.inMemorySessionStore((String)PositionRestartIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L));
            }

            @Override
            public boolean isSession() {
                return true;
            }
        }
        ,
        ROCKS_SESSION{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentSessionStore((String)PositionRestartIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L));
            }

            @Override
            public boolean isSession() {
                return true;
            }
        };


        public abstract StoreSupplier<?> supplier();

        public boolean timestamped() {
            return true;
        }

        public boolean keyValue() {
            return false;
        }

        public boolean isWindowed() {
            return false;
        }

        public boolean isSession() {
            return false;
        }
    }
}

