/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class WindowStoreFetchTest {
    private static final String STORE_NAME = "store";
    private static final int DATA_SIZE = 5;
    private static final long WINDOW_SIZE = 500L;
    private static final long RETENTION_MS = 10000L;
    private StoreType storeType;
    private boolean enableLogging;
    private boolean enableCaching;
    private boolean forward;
    private LinkedList<KeyValue<Windowed<String>, Long>> expectedRecords;
    private LinkedList<KeyValue<String, String>> records;
    private Properties streamsConfig;
    private String low;
    private String high;
    private String middle;
    private String innerLow;
    private String innerHigh;
    private String innerLowBetween;
    private String innerHighBetween;
    private TimeWindowedKStream<String, String> windowedStream;

    public void setup(StoreType storeType, boolean enableLogging, boolean enableCaching, boolean forward) {
        this.storeType = storeType;
        this.enableLogging = enableLogging;
        this.enableCaching = enableCaching;
        this.forward = forward;
        this.records = new LinkedList();
        this.expectedRecords = new LinkedList();
        int m = 2;
        for (int i = 0; i < 5; ++i) {
            int index;
            String key = "key-" + i * 2;
            String value = "val-" + i * 2;
            KeyValue r = new KeyValue((Object)key, (Object)value);
            this.records.add((KeyValue<String, String>)r);
            this.records.add((KeyValue<String, String>)r);
            long windowStartTime = i < 2 ? 0L : 500L;
            this.expectedRecords.add((KeyValue<Windowed<String>, Long>)new KeyValue((Object)new Windowed((Object)key, (Window)new TimeWindow(windowStartTime, windowStartTime + 500L)), (Object)2L));
            this.high = key;
            if (this.low == null) {
                this.low = key;
            }
            if (i == 2) {
                this.middle = key;
            }
            if (i == 1) {
                this.innerLow = key;
                index = i * 2 - 1;
                this.innerLowBetween = "key-" + index;
            }
            if (i != 3) continue;
            this.innerHigh = key;
            index = i * 2 + 1;
            this.innerHighBetween = "key-" + index;
        }
        Assertions.assertNotNull((Object)this.low);
        Assertions.assertNotNull((Object)this.high);
        Assertions.assertNotNull((Object)this.middle);
        Assertions.assertNotNull((Object)this.innerLow);
        Assertions.assertNotNull((Object)this.innerHigh);
        Assertions.assertNotNull((Object)this.innerLowBetween);
        Assertions.assertNotNull((Object)this.innerHighBetween);
    }

    public static Stream<Arguments> data() {
        List<StoreType> types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed);
        List<Boolean> logging = Arrays.asList(true, false);
        List<Boolean> caching = Arrays.asList(true, false);
        List<Boolean> forward = Arrays.asList(true, false);
        return WindowStoreFetchTest.buildParameters(types, logging, caching, forward);
    }

    @BeforeEach
    public void setup() {
        this.streamsConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath())}));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testStoreConfig(StoreType storeType, boolean enableLogging, boolean enableCaching, boolean forward) {
        this.setup(storeType, enableLogging, enableCaching, forward);
        Materialized<String, Long, WindowStore<Bytes, byte[]>> stateStoreConfig = this.getStoreConfig(this.storeType, this.enableLogging, this.enableCaching);
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(500L))).count(stateStoreConfig).toStream().to("output");
        Topology topology = builder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(topology);){
            Iterator<Object> dataIterator;
            TestInputTopic input = driver.createInputTopic("input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            WindowStore stateStore = driver.getWindowStore(STORE_NAME);
            int medium = 4;
            for (int i = 0; i < this.records.size(); ++i) {
                KeyValue<String, String> kv = this.records.get(i);
                long windowStartTime = i < 4 ? 0L : 500L;
                input.pipeInput(kv.key, kv.value, windowStartTime + (long)i);
            }
            try (KeyValueIterator scanIterator = this.forward ? stateStore.fetchAll(0L, Long.MAX_VALUE) : stateStore.backwardFetchAll(0L, Long.MAX_VALUE);){
                dataIterator = this.forward ? this.expectedRecords.iterator() : this.expectedRecords.descendingIterator();
                TestUtils.checkEquals((Iterator)scanIterator, dataIterator);
            }
            scanIterator = this.forward ? stateStore.fetch(null, null, 0L, Long.MAX_VALUE) : stateStore.backwardFetch(null, null, 0L, Long.MAX_VALUE);
            var15_18 = null;
            try {
                dataIterator = this.forward ? this.expectedRecords.iterator() : this.expectedRecords.descendingIterator();
                TestUtils.checkEquals((Iterator)scanIterator, dataIterator);
            }
            catch (Throwable throwable) {
                var15_18 = throwable;
                throw throwable;
            }
            finally {
                if (scanIterator != null) {
                    if (var15_18 != null) {
                        try {
                            scanIterator.close();
                        }
                        catch (Throwable throwable) {
                            var15_18.addSuppressed(throwable);
                        }
                    } else {
                        scanIterator.close();
                    }
                }
            }
            this.testRange((WindowStore<String, Long>)stateStore, this.innerLow, this.innerHigh, this.forward);
            this.testRange((WindowStore<String, Long>)stateStore, null, this.middle, this.forward);
            this.testRange((WindowStore<String, Long>)stateStore, this.middle, null, this.forward);
            this.testRange((WindowStore<String, Long>)stateStore, null, this.innerHighBetween, this.forward);
            this.testRange((WindowStore<String, Long>)stateStore, this.innerLowBetween, null, this.forward);
        }
    }

    private List<KeyValue<Windowed<String>, Long>> filterList(KeyValueIterator<Windowed<String>, Long> iterator, String from, String to) {
        Predicate<KeyValue> pred = elem -> {
            if (from != null && ((String)((Windowed)elem.key).key()).compareTo(from) < 0) {
                return false;
            }
            if (to != null && ((String)((Windowed)elem.key).key()).compareTo(to) > 0) {
                return false;
            }
            return elem != null;
        };
        return Utils.toList(iterator, pred);
    }

    private void testRange(WindowStore<String, Long> store, String from, String to, boolean forward) {
        try (KeyValueIterator resultIterator = forward ? store.fetch((Object)from, (Object)to, 0L, Long.MAX_VALUE) : store.backwardFetch((Object)from, (Object)to, 0L, Long.MAX_VALUE);
             KeyValueIterator expectedIterator = forward ? store.fetchAll(0L, Long.MAX_VALUE) : store.backwardFetchAll(0L, Long.MAX_VALUE);){
            List result = Utils.toList((Iterator)resultIterator);
            List<KeyValue<Windowed<String>, Long>> expected = this.filterList((KeyValueIterator<Windowed<String>, Long>)expectedIterator, from, to);
            MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.is(expected));
        }
    }

    private static Stream<Arguments> buildParameters(List<StoreType> types, List<Boolean> logging, List<Boolean> caching, List<Boolean> forward) {
        Stream.Builder<Arguments> builder = Stream.builder();
        for (StoreType type : types) {
            for (boolean log : logging) {
                for (boolean cache : caching) {
                    for (boolean f : forward) {
                        builder.add(Arguments.of((Object[])new Object[]{type, log, cache, f}));
                    }
                }
            }
        }
        return builder.build();
    }

    private Materialized<String, Long, WindowStore<Bytes, byte[]>> getStoreConfig(StoreType type, boolean cachingEnabled, boolean loggingEnabled) {
        Supplier<WindowBytesStoreSupplier> createStore = () -> {
            if (type == StoreType.InMemory) {
                return Stores.inMemoryWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(10000L), (Duration)Duration.ofMillis(500L), (boolean)false);
            }
            if (type == StoreType.RocksDB) {
                return Stores.persistentWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(10000L), (Duration)Duration.ofMillis(500L), (boolean)false);
            }
            if (type == StoreType.Timed) {
                return Stores.persistentTimestampedWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(10000L), (Duration)Duration.ofMillis(500L), (boolean)false);
            }
            return Stores.inMemoryWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(10000L), (Duration)Duration.ofMillis(500L), (boolean)false);
        };
        WindowBytesStoreSupplier stateStoreSupplier = createStore.get();
        Materialized stateStoreConfig = Materialized.as((WindowBytesStoreSupplier)stateStoreSupplier).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long());
        if (cachingEnabled) {
            stateStoreConfig.withCachingEnabled();
        } else {
            stateStoreConfig.withCachingDisabled();
        }
        if (loggingEnabled) {
            stateStoreConfig.withLoggingEnabled(new HashMap());
        } else {
            stateStoreConfig.withLoggingDisabled();
        }
        return stateStoreConfig;
    }

    private static enum StoreType {
        InMemory,
        RocksDB,
        Timed;

    }
}

