/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.streams.distore;

import com.google.protobuf.ByteString;
import io.apicurio.registry.utils.ProtoUtil;
import io.apicurio.registry.utils.streams.distore.ExtReadOnlyKeyValueStore;
import io.apicurio.registry.utils.streams.distore.KeyValueSerde;
import io.apicurio.registry.utils.streams.distore.StreamObserverSpliterator;
import io.apicurio.registry.utils.streams.distore.StreamToKeyValueIteratorAdapter;
import io.apicurio.registry.utils.streams.distore.proto.FilterReq;
import io.apicurio.registry.utils.streams.distore.proto.Key;
import io.apicurio.registry.utils.streams.distore.proto.KeyFromKeyToReq;
import io.apicurio.registry.utils.streams.distore.proto.KeyReq;
import io.apicurio.registry.utils.streams.distore.proto.KeyValue;
import io.apicurio.registry.utils.streams.distore.proto.KeyValueStoreGrpc;
import io.apicurio.registry.utils.streams.distore.proto.Size;
import io.apicurio.registry.utils.streams.distore.proto.Value;
import io.apicurio.registry.utils.streams.distore.proto.VoidReq;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.state.KeyValueIterator;

public class ReadOnlyKeyValueStoreGrpcClient<K, V>
implements ExtReadOnlyKeyValueStore<K, V>,
AutoCloseable {
    private final String storeName;
    private final Channel channel;
    private final KeyValueStoreGrpc.KeyValueStoreStub stub;
    private final KeyValueSerde<K, V> keyValueSerde;

    public ReadOnlyKeyValueStoreGrpcClient(String storeName, Channel channel, Serde<K> keySerde, Serde<V> valSerde) {
        this.storeName = storeName;
        this.channel = channel;
        this.stub = KeyValueStoreGrpc.newStub(channel);
        this.keyValueSerde = new KeyValueSerde<K, V>(storeName + "-serde-topic", keySerde, valSerde);
    }

    @Override
    public Stream<K> allKeys() {
        StreamObserverSpliterator<Key> observer = new StreamObserverSpliterator<Key>();
        this.stub.allKeys(VoidReq.newBuilder().setStoreName(this.storeName).build(), observer);
        return observer.stream().map(res -> {
            ByteString key = res.getKey();
            return this.keyValueSerde.deserializeKey(key.toByteArray());
        });
    }

    @Override
    public Stream<org.apache.kafka.streams.KeyValue<K, V>> filter(String filter, String over) {
        StreamObserverSpliterator<KeyValue> observer = new StreamObserverSpliterator<KeyValue>();
        this.stub.filter(FilterReq.newBuilder().setFilter(ProtoUtil.nullAsEmpty((String)filter)).setOver(over).setStoreName(this.storeName).build(), observer);
        return this.keyValueStream(observer.stream());
    }

    @Override
    public void close() {
        if (this.channel instanceof ManagedChannel) {
            ((ManagedChannel)this.channel).shutdown();
        }
    }

    public V get(K key) {
        ByteString keyBytes = ByteString.copyFrom((byte[])this.keyValueSerde.serializeKey(key));
        StreamObserverSpliterator<Value> observer = new StreamObserverSpliterator<Value>();
        this.stub.get(KeyReq.newBuilder().setKey(keyBytes).setStoreName(this.storeName).build(), observer);
        return observer.stream().map(value -> this.keyValueSerde.deserializeVal(value.getValue().toByteArray())).findFirst().orElse(null);
    }

    public KeyValueIterator<K, V> range(K from, K to) {
        ByteString fromBytes = ByteString.copyFrom((byte[])this.keyValueSerde.serializeKey(from));
        ByteString toBytes = ByteString.copyFrom((byte[])this.keyValueSerde.serializeKey(to));
        StreamObserverSpliterator<KeyValue> observer = new StreamObserverSpliterator<KeyValue>();
        this.stub.range(KeyFromKeyToReq.newBuilder().setKeyFrom(fromBytes).setKeyTo(toBytes).setStoreName(this.storeName).build(), observer);
        return this.keyValueIterator(observer.stream());
    }

    public KeyValueIterator<K, V> all() {
        StreamObserverSpliterator<KeyValue> observer = new StreamObserverSpliterator<KeyValue>();
        this.stub.all(VoidReq.newBuilder().setStoreName(this.storeName).build(), observer);
        return this.keyValueIterator(observer.stream());
    }

    public long approximateNumEntries() {
        StreamObserverSpliterator<Size> observer = new StreamObserverSpliterator<Size>();
        this.stub.approximateNumEntries(VoidReq.newBuilder().setStoreName(this.storeName).build(), observer);
        return StreamSupport.stream(observer, false).mapToLong(Size::getSize).findFirst().getAsLong();
    }

    private KeyValueIterator<K, V> keyValueIterator(Stream<KeyValue> stream) {
        return new StreamToKeyValueIteratorAdapter<K, V>(this.keyValueStream(stream));
    }

    private Stream<org.apache.kafka.streams.KeyValue<K, V>> keyValueStream(Stream<KeyValue> stream) {
        return stream.map(kv -> new org.apache.kafka.streams.KeyValue(this.keyValueSerde.deserializeKey(kv.getKey().toByteArray()), this.keyValueSerde.deserializeVal(kv.getValue().toByteArray())));
    }
}

