/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Category(value={IntegrationTest.class})
public class RangeQueryIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Properties STREAMS_CONFIG = new Properties();
    private static final String APP_ID = "range-query-integration-test";
    private static final Long COMMIT_INTERVAL = 100L;
    private static String inputStream;
    private static final String TABLE_NAME = "mytable";
    private static final int DATA_SIZE = 5;
    private StoreType storeType;
    private boolean enableLogging;
    private boolean enableCaching;
    private boolean forward;
    private KafkaStreams kafkaStreams;
    private LinkedList<KeyValue<String, String>> records;
    private String low;
    private String high;
    private String middle;
    private String innerLow;
    private String innerHigh;
    private String innerLowBetween;
    private String innerHighBetween;
    @Rule
    public TestName testName = new TestName();

    public RangeQueryIntegrationTest(StoreType storeType, boolean enableLogging, boolean enableCaching, boolean forward) {
        this.storeType = storeType;
        this.enableLogging = enableLogging;
        this.enableCaching = enableCaching;
        this.forward = forward;
        this.records = new LinkedList();
        int m = 2;
        for (int i = 0; i < 5; ++i) {
            int index;
            String key = "key-" + i * 2;
            String value = "val-" + i * 2;
            this.records.add((KeyValue<String, String>)new KeyValue((Object)key, (Object)value));
            this.high = key;
            if (this.low == null) {
                this.low = key;
            }
            if (i == 2) {
                this.middle = key;
            }
            if (i == 1) {
                this.innerLow = key;
                index = i * 2 - 1;
                this.innerLowBetween = "key-" + index;
            }
            if (i != 3) continue;
            this.innerHigh = key;
            index = i * 2 + 1;
            this.innerHighBetween = "key-" + index;
        }
        Assert.assertNotNull((Object)this.low);
        Assert.assertNotNull((Object)this.high);
        Assert.assertNotNull((Object)this.middle);
        Assert.assertNotNull((Object)this.innerLow);
        Assert.assertNotNull((Object)this.innerHigh);
        Assert.assertNotNull((Object)this.innerLowBetween);
        Assert.assertNotNull((Object)this.innerHighBetween);
    }

    @Parameterized.Parameters(name="storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}")
    public static Collection<Object[]> data() {
        List<StoreType> types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed);
        List<Boolean> logging = Arrays.asList(true, false);
        List<Boolean> caching = Arrays.asList(true, false);
        List<Boolean> forward = Arrays.asList(true, false);
        return RangeQueryIntegrationTest.buildParameters(types, logging, caching, forward);
    }

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
        STREAMS_CONFIG.put("application.id", APP_ID);
        STREAMS_CONFIG.put("state.dir", TestUtils.tempDirectory().getPath());
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void setupTopics() throws Exception {
        inputStream = "input-topic";
        CLUSTER.createTopic(inputStream);
    }

    @After
    public void cleanup() throws InterruptedException {
        CLUSTER.deleteAllTopicsAndWait(120000L);
    }

    @Test
    public void testStoreConfig() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = this.getStoreConfig(this.storeType, TABLE_NAME, this.enableLogging, this.enableCaching);
        KTable table = builder.table(inputStream, stateStoreConfig);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG);){
            Iterator<Object> dataIterator;
            List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60L));
            this.writeInputData();
            ReadOnlyKeyValueStore stateStore = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(1000000L, TABLE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
            TestUtils.waitForCondition(() -> stateStore.get((Object)this.high) != null, (String)"The store never finished populating");
            try (KeyValueIterator scanIterator = this.forward ? stateStore.range(null, null) : stateStore.reverseRange(null, null);){
                dataIterator = this.forward ? this.records.iterator() : this.records.descendingIterator();
                TestUtils.checkEquals((Iterator)scanIterator, dataIterator);
            }
            var9_11 = null;
            try (KeyValueIterator allIterator = this.forward ? stateStore.all() : stateStore.reverseAll();){
                dataIterator = this.forward ? this.records.iterator() : this.records.descendingIterator();
                TestUtils.checkEquals((Iterator)allIterator, dataIterator);
            }
            catch (Throwable throwable) {
                var9_11 = throwable;
                throw throwable;
            }
            this.testRange("range", (ReadOnlyKeyValueStore<String, String>)stateStore, this.innerLow, this.innerHigh, this.forward);
            this.testRange("until", (ReadOnlyKeyValueStore<String, String>)stateStore, null, this.middle, this.forward);
            this.testRange("from", (ReadOnlyKeyValueStore<String, String>)stateStore, this.middle, null, this.forward);
            this.testRange("untilBetween", (ReadOnlyKeyValueStore<String, String>)stateStore, null, this.innerHighBetween, this.forward);
            this.testRange("fromBetween", (ReadOnlyKeyValueStore<String, String>)stateStore, this.innerLowBetween, null, this.forward);
        }
    }

    private void writeInputData() {
        IntegrationTestUtils.produceKeyValuesSynchronously(inputStream, this.records, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), (Time)RangeQueryIntegrationTest.CLUSTER.time);
    }

    private List<KeyValue<String, String>> filterList(KeyValueIterator<String, String> iterator, final String from, final String to) {
        Predicate<KeyValue<String, String>> pred = new Predicate<KeyValue<String, String>>(){

            @Override
            public boolean test(KeyValue<String, String> elem) {
                if (from != null && ((String)elem.key).compareTo(from) < 0) {
                    return false;
                }
                if (to != null && ((String)elem.key).compareTo(to) > 0) {
                    return false;
                }
                return elem != null;
            }
        };
        return Utils.toList(iterator, (Predicate)pred);
    }

    private void testRange(String name, ReadOnlyKeyValueStore<String, String> store, String from, String to, boolean forward) {
        try (KeyValueIterator resultIterator = forward ? store.range((Object)from, (Object)to) : store.reverseRange((Object)from, (Object)to);
             KeyValueIterator expectedIterator = forward ? store.all() : store.reverseAll();){
            List result = Utils.toList((Iterator)resultIterator);
            List<KeyValue<String, String>> expected = this.filterList((KeyValueIterator<String, String>)expectedIterator, from, to);
            MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.is(expected));
        }
    }

    private Materialized<String, String, KeyValueStore<Bytes, byte[]>> getStoreConfig(StoreType type, String name, boolean cachingEnabled, boolean loggingEnabled) {
        Supplier<KeyValueBytesStoreSupplier> createStore = () -> {
            if (type == StoreType.InMemory) {
                return Stores.inMemoryKeyValueStore((String)TABLE_NAME);
            }
            if (type == StoreType.RocksDB) {
                return Stores.persistentKeyValueStore((String)TABLE_NAME);
            }
            if (type == StoreType.Timed) {
                return Stores.persistentTimestampedKeyValueStore((String)TABLE_NAME);
            }
            return Stores.inMemoryKeyValueStore((String)TABLE_NAME);
        };
        KeyValueBytesStoreSupplier stateStoreSupplier = createStore.get();
        Materialized stateStoreConfig = Materialized.as((KeyValueBytesStoreSupplier)stateStoreSupplier).withKeySerde(Serdes.String()).withValueSerde(Serdes.String());
        if (cachingEnabled) {
            stateStoreConfig.withCachingEnabled();
        } else {
            stateStoreConfig.withCachingDisabled();
        }
        if (loggingEnabled) {
            stateStoreConfig.withLoggingEnabled(new HashMap());
        } else {
            stateStoreConfig.withLoggingDisabled();
        }
        return stateStoreConfig;
    }

    private static Collection<Object[]> buildParameters(List<?> ... argOptions) {
        List<Object[]> result = new LinkedList<Object[]>();
        result.add(new Object[0]);
        for (List<?> argOption : argOptions) {
            result = RangeQueryIntegrationTest.times(result, argOption);
        }
        return result;
    }

    private static List<Object[]> times(List<Object[]> left, List<?> right) {
        LinkedList<Object[]> result = new LinkedList<Object[]>();
        for (Object[] args : left) {
            for (Object rightElem : right) {
                Object[] resArgs = new Object[args.length + 1];
                System.arraycopy(args, 0, resArgs, 0, args.length);
                resArgs[args.length] = rightElem;
                result.add(resArgs);
            }
        }
        return result;
    }

    private static enum StoreType {
        InMemory,
        RocksDB,
        Timed;

    }
}

