/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.streams.distore;

import io.apicurio.registry.utils.streams.distore.DistributedReadOnlyStateStore;
import io.apicurio.registry.utils.streams.distore.ExtReadOnlyKeyValueStore;
import io.apicurio.registry.utils.streams.distore.ExtReadOnlyKeyValueStoreImpl;
import io.apicurio.registry.utils.streams.distore.FilterPredicate;
import io.apicurio.registry.utils.streams.distore.ReadOnlyKeyValueStoreGrpcClient;
import io.apicurio.registry.utils.streams.distore.StreamToKeyValueIteratorAdapter;
import io.grpc.Channel;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

public class DistributedReadOnlyKeyValueStore<K, V>
extends DistributedReadOnlyStateStore<K, V, ExtReadOnlyKeyValueStore<K, V>>
implements ExtReadOnlyKeyValueStore<K, V> {
    private final FilterPredicate<K, V> filterPredicate;

    public DistributedReadOnlyKeyValueStore(KafkaStreams streams, HostInfo localApplicationServer, String storeName, Serde<K> keySerde, Serde<V> valSerde, Function<? super HostInfo, ? extends Channel> grpcChannelProvider, boolean parallel, FilterPredicate<K, V> filterPredicate) {
        super(streams, localApplicationServer, storeName, keySerde, valSerde, grpcChannelProvider, parallel);
        this.filterPredicate = filterPredicate;
    }

    @Override
    protected ExtReadOnlyKeyValueStore<K, V> localService(String storeName, KafkaStreams streams) {
        QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
        StoreQueryParameters sqp = StoreQueryParameters.fromNameAndType((String)storeName, (QueryableStoreType)queryableStoreType);
        ReadOnlyKeyValueStore delegate = (ReadOnlyKeyValueStore)streams.store(sqp);
        return new ExtReadOnlyKeyValueStoreImpl<K, V>(delegate, this.filterPredicate);
    }

    @Override
    protected ExtReadOnlyKeyValueStore<K, V> remoteServiceGrpcClient(String storeName, Channel channel, Serde<K> keySerde, Serde<V> valSerde) {
        return new ReadOnlyKeyValueStoreGrpcClient<K, V>(storeName, channel, keySerde, valSerde);
    }

    @Override
    public Stream<K> allKeys() {
        return this.allServicesForStoreStream().flatMap(ExtReadOnlyKeyValueStore::allKeys);
    }

    @Override
    public Stream<KeyValue<K, V>> filter(String filter, String over) {
        return this.allServicesForStoreStream().flatMap(store -> store.filter(filter, over));
    }

    public V get(K key) {
        return (V)((ExtReadOnlyKeyValueStore)this.serviceForKey(key)).get(key);
    }

    public KeyValueIterator<K, V> range(K from, K to) {
        return new StreamToKeyValueIteratorAdapter(this.allServicesForStoreStream().flatMap(store -> StreamToKeyValueIteratorAdapter.toStream(store.range(from, to))));
    }

    public KeyValueIterator<K, V> all() {
        return new StreamToKeyValueIteratorAdapter(this.allServicesForStoreStream().flatMap(store -> StreamToKeyValueIteratorAdapter.toStream(store.all())));
    }

    public long approximateNumEntries() {
        return this.allServicesForStoreStream().mapToLong(ReadOnlyKeyValueStore::approximateNumEntries).sum();
    }
}

