/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tag(value="integration")
@Timeout(value=600L)
public class RangeQueryIntegrationTest {
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Properties STREAMS_CONFIG = new Properties();
    private static final String APP_ID = "range-query-integration-test";
    private static final Long COMMIT_INTERVAL = 100L;
    private static String inputStream;
    private static final String TABLE_NAME = "mytable";
    private static final int DATA_SIZE = 5;
    private final LinkedList<KeyValue<String, String>> records = new LinkedList();
    private String low;
    private String high;
    private String middle;
    private String innerLow;
    private String innerHigh;
    private String innerLowBetween;
    private String innerHighBetween;

    public RangeQueryIntegrationTest() {
        int m = 2;
        for (int i = 0; i < 5; ++i) {
            int index;
            String key = "key-" + i * 2;
            String value = "val-" + i * 2;
            this.records.add((KeyValue<String, String>)new KeyValue((Object)key, (Object)value));
            this.high = key;
            if (this.low == null) {
                this.low = key;
            }
            if (i == 2) {
                this.middle = key;
            }
            if (i == 1) {
                this.innerLow = key;
                index = i * 2 - 1;
                this.innerLowBetween = "key-" + index;
            }
            if (i != 3) continue;
            this.innerHigh = key;
            index = i * 2 + 1;
            this.innerHighBetween = "key-" + index;
        }
        Assertions.assertNotNull((Object)this.low);
        Assertions.assertNotNull((Object)this.high);
        Assertions.assertNotNull((Object)this.middle);
        Assertions.assertNotNull((Object)this.innerLow);
        Assertions.assertNotNull((Object)this.innerHigh);
        Assertions.assertNotNull((Object)this.innerLowBetween);
        Assertions.assertNotNull((Object)this.innerHighBetween);
    }

    public static Stream<Arguments> data() {
        List<StoreType> types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed);
        List<Boolean> logging = Arrays.asList(true, false);
        List<Boolean> caching = Arrays.asList(true, false);
        List<Boolean> forward = Arrays.asList(true, false);
        return RangeQueryIntegrationTest.buildParameters(types, logging, caching, forward).stream().map(Arguments::of);
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
        STREAMS_CONFIG.put("application.id", APP_ID);
        STREAMS_CONFIG.put("state.dir", TestUtils.tempDirectory().getPath());
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void setupTopics() throws Exception {
        inputStream = "input-topic";
        CLUSTER.createTopic(inputStream);
    }

    @AfterEach
    public void cleanup() throws InterruptedException {
        CLUSTER.deleteAllTopicsAndWait(120000L);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testStoreConfig(StoreType storeType, boolean enableLogging, boolean enableCaching, boolean forward) throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = this.getStoreConfig(storeType, enableLogging, enableCaching);
        builder.table(inputStream, stateStoreConfig);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG);){
            Iterator<Object> dataIterator;
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            this.writeInputData();
            ReadOnlyKeyValueStore stateStore = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(1000000L, TABLE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
            TestUtils.waitForCondition(() -> stateStore.get((Object)this.high) != null, (String)"The store never finished populating");
            try (KeyValueIterator scanIterator = forward ? stateStore.range(null, null) : stateStore.reverseRange(null, null);){
                dataIterator = forward ? this.records.iterator() : this.records.descendingIterator();
                TestUtils.checkEquals((Iterator)scanIterator, dataIterator);
            }
            var11_13 = null;
            try (KeyValueIterator allIterator = forward ? stateStore.all() : stateStore.reverseAll();){
                dataIterator = forward ? this.records.iterator() : this.records.descendingIterator();
                TestUtils.checkEquals((Iterator)allIterator, dataIterator);
            }
            catch (Throwable throwable) {
                var11_13 = throwable;
                throw throwable;
            }
            this.testRange("range", (ReadOnlyKeyValueStore<String, String>)stateStore, this.innerLow, this.innerHigh, forward);
            this.testRange("until", (ReadOnlyKeyValueStore<String, String>)stateStore, null, this.middle, forward);
            this.testRange("from", (ReadOnlyKeyValueStore<String, String>)stateStore, this.middle, null, forward);
            this.testRange("untilBetween", (ReadOnlyKeyValueStore<String, String>)stateStore, null, this.innerHighBetween, forward);
            this.testRange("fromBetween", (ReadOnlyKeyValueStore<String, String>)stateStore, this.innerLowBetween, null, forward);
        }
    }

    private void writeInputData() {
        IntegrationTestUtils.produceKeyValuesSynchronously(inputStream, this.records, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), (Time)RangeQueryIntegrationTest.CLUSTER.time);
    }

    private List<KeyValue<String, String>> filterList(KeyValueIterator<String, String> iterator, String from, String to) {
        Predicate<KeyValue> predicate = elem -> {
            if (from != null && ((String)elem.key).compareTo(from) < 0) {
                return false;
            }
            if (to != null && ((String)elem.key).compareTo(to) > 0) {
                return false;
            }
            return elem != null;
        };
        return Utils.toList(iterator, predicate);
    }

    private void testRange(String name, ReadOnlyKeyValueStore<String, String> store, String from, String to, boolean forward) {
        try (KeyValueIterator resultIterator = forward ? store.range((Object)from, (Object)to) : store.reverseRange((Object)from, (Object)to);
             KeyValueIterator expectedIterator = forward ? store.all() : store.reverseAll();){
            List result = Utils.toList((Iterator)resultIterator);
            List<KeyValue<String, String>> expected = this.filterList((KeyValueIterator<String, String>)expectedIterator, from, to);
            MatcherAssert.assertThat((String)name, (Object)result, (Matcher)CoreMatchers.is(expected));
        }
    }

    private Materialized<String, String, KeyValueStore<Bytes, byte[]>> getStoreConfig(StoreType type, boolean cachingEnabled, boolean loggingEnabled) {
        Supplier<KeyValueBytesStoreSupplier> createStore = () -> {
            if (type == StoreType.InMemory) {
                return Stores.inMemoryKeyValueStore((String)TABLE_NAME);
            }
            if (type == StoreType.RocksDB) {
                return Stores.persistentKeyValueStore((String)TABLE_NAME);
            }
            if (type == StoreType.Timed) {
                return Stores.persistentTimestampedKeyValueStore((String)TABLE_NAME);
            }
            return Stores.inMemoryKeyValueStore((String)TABLE_NAME);
        };
        KeyValueBytesStoreSupplier stateStoreSupplier = createStore.get();
        Materialized stateStoreConfig = Materialized.as((KeyValueBytesStoreSupplier)stateStoreSupplier).withKeySerde(Serdes.String()).withValueSerde(Serdes.String());
        if (cachingEnabled) {
            stateStoreConfig.withCachingEnabled();
        } else {
            stateStoreConfig.withCachingDisabled();
        }
        if (loggingEnabled) {
            stateStoreConfig.withLoggingEnabled(new HashMap());
        } else {
            stateStoreConfig.withLoggingDisabled();
        }
        return stateStoreConfig;
    }

    private static Collection<Object[]> buildParameters(List<?> ... argOptions) {
        List<Object[]> result = new LinkedList<Object[]>();
        result.add(new Object[0]);
        for (List<?> argOption : argOptions) {
            result = RangeQueryIntegrationTest.times(result, argOption);
        }
        return result;
    }

    private static List<Object[]> times(List<Object[]> left, List<?> right) {
        LinkedList<Object[]> result = new LinkedList<Object[]>();
        for (Object[] args : left) {
            for (Object rightElem : right) {
                Object[] resArgs = new Object[args.length + 1];
                System.arraycopy(args, 0, resArgs, 0, args.length);
                resArgs[args.length] = rightElem;
                result.add(resArgs);
            }
        }
        return result;
    }

    private static enum StoreType {
        InMemory,
        RocksDB,
        Timed;

    }
}

