/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state;

import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;

public class KeyValueStoreTestDriver<K, V> {
    private final Properties props;
    private final Map<K, V> flushedEntries = new HashMap();
    private final Set<K> flushedRemovals = new HashSet<K>();
    private final List<KeyValue<byte[], byte[]>> restorableEntries = new LinkedList<KeyValue<byte[], byte[]>>();
    private final InternalMockProcessorContext context;
    private final StateSerdes<K, V> stateSerdes;

    public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) {
        StateSerdes serdes = StateSerdes.withBuiltinTypes((String)"unexpected", keyClass, valueClass);
        return new KeyValueStoreTestDriver<K, V>(serdes);
    }

    public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer, Deserializer<K> keyDeserializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
        StateSerdes serdes = new StateSerdes("unexpected", Serdes.serdeFrom(keySerializer, keyDeserializer), Serdes.serdeFrom(valueSerializer, valueDeserializer));
        return new KeyValueStoreTestDriver<K, V>(serdes);
    }

    private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
        ByteArraySerializer rawSerializer = new ByteArraySerializer();
        MockProducer producer = new MockProducer(true, (Serializer)rawSerializer, (Serializer)rawSerializer);
        RecordCollectorImpl recordCollector = new RecordCollectorImpl("KeyValueStoreTestDriver", new LogContext("KeyValueStoreTestDriver "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")){

            public <K1, V1> void send(String topic, K1 key, V1 value, Headers headers, Integer partition, Long timestamp, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                Object keyTest = serdes.keyFrom(keySerializer.serialize(topic, headers, key));
                Object valueTest = serdes.valueFrom(valueSerializer.serialize(topic, headers, value));
                KeyValueStoreTestDriver.this.recordFlushed(keyTest, valueTest);
            }

            public <K1, V1> void send(String topic, K1 key, V1 value, Headers headers, Long timestamp, Serializer<K1> keySerializer, Serializer<V1> valueSerializer, StreamPartitioner<? super K1, ? super V1> partitioner) {
                throw new UnsupportedOperationException();
            }
        };
        recordCollector.init((Producer)producer);
        File stateDir = TestUtils.tempDirectory();
        stateDir.mkdirs();
        this.stateSerdes = serdes;
        this.props = new Properties();
        this.props.put("application.id", "application-id");
        this.props.put("bootstrap.servers", "localhost:9092");
        this.props.put("default.timestamp.extractor", MockTimestampExtractor.class);
        this.props.put("default.key.serde", serdes.keySerde().getClass());
        this.props.put("default.value.serde", serdes.valueSerde().getClass());
        this.props.put("rocksdb.config.setter", RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
        this.props.put("metrics.recording.level", "DEBUG");
        this.context = new InternalMockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), (RecordCollector)recordCollector, null){
            ThreadCache cache;
            {
                this.cache = new ThreadCache(new LogContext("testCache "), 0x100000L, this.metrics());
            }

            public ThreadCache getCache() {
                return this.cache;
            }

            public Map<String, Object> appConfigs() {
                return new StreamsConfig((Map)KeyValueStoreTestDriver.this.props).originals();
            }

            public Map<String, Object> appConfigsWithPrefix(String prefix) {
                return new StreamsConfig((Map)KeyValueStoreTestDriver.this.props).originalsWithPrefix(prefix);
            }
        };
    }

    private void recordFlushed(K key, V value) {
        if (value == null) {
            this.flushedRemovals.add(key);
            this.flushedEntries.remove(key);
        } else {
            this.flushedEntries.put(key, value);
            this.flushedRemovals.remove(key);
        }
    }

    public Iterable<KeyValue<byte[], byte[]>> restoredEntries() {
        return this.restorableEntries;
    }

    public void addEntryToRestoreLog(K key, V value) {
        this.restorableEntries.add((KeyValue<byte[], byte[]>)new KeyValue((Object)this.stateSerdes.rawKey(key), (Object)this.stateSerdes.rawValue(value)));
    }

    public ProcessorContext context() {
        return this.context;
    }

    public int checkForRestoredEntries(KeyValueStore<K, V> store) {
        int missing = 0;
        for (KeyValue<byte[], byte[]> kv : this.restorableEntries) {
            Object value;
            if (kv == null || Objects.equals(value = store.get(this.stateSerdes.keyFrom((byte[])kv.key)), this.stateSerdes.valueFrom((byte[])kv.value))) continue;
            ++missing;
        }
        return missing;
    }

    public int sizeOf(KeyValueStore<K, V> store) {
        int size = 0;
        try (KeyValueIterator iterator = store.all();){
            while (iterator.hasNext()) {
                iterator.next();
                ++size;
            }
        }
        return size;
    }

    public V flushedEntryStored(K key) {
        return this.flushedEntries.get(key);
    }

    public boolean flushedEntryRemoved(K key) {
        return this.flushedRemovals.contains(key);
    }

    public int numFlushedEntryStored() {
        return this.flushedEntries.size();
    }

    public int numFlushedEntryRemoved() {
        return this.flushedRemovals.size();
    }

    public void clear() {
        this.restorableEntries.clear();
        this.flushedEntries.clear();
        this.flushedRemovals.clear();
    }
}

