/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.streams.distore;

import com.google.protobuf.ByteString;
import io.apicurio.registry.utils.ProtoUtil;
import io.apicurio.registry.utils.streams.distore.ExtReadOnlyKeyValueStore;
import io.apicurio.registry.utils.streams.distore.ExtReadOnlyKeyValueStoreImpl;
import io.apicurio.registry.utils.streams.distore.FilterPredicate;
import io.apicurio.registry.utils.streams.distore.KeyValueSerde;
import io.apicurio.registry.utils.streams.distore.proto.FilterReq;
import io.apicurio.registry.utils.streams.distore.proto.Key;
import io.apicurio.registry.utils.streams.distore.proto.KeyFromKeyToReq;
import io.apicurio.registry.utils.streams.distore.proto.KeyReq;
import io.apicurio.registry.utils.streams.distore.proto.KeyValue;
import io.apicurio.registry.utils.streams.distore.proto.KeyValueStoreGrpc;
import io.apicurio.registry.utils.streams.distore.proto.Size;
import io.apicurio.registry.utils.streams.distore.proto.Value;
import io.apicurio.registry.utils.streams.distore.proto.VoidReq;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

public class KeyValueStoreGrpcImplLocalDispatcher
extends KeyValueStoreGrpc.KeyValueStoreImplBase {
    private final KafkaStreams streams;
    private final KeyValueSerde.Registry keyValueSerdes;
    private final ConcurrentMap<String, ReadOnlyKeyValueStore<?, ?>> keyValueStores = new ConcurrentHashMap();
    private final FilterPredicate<?, ?> filterPredicate;

    public KeyValueStoreGrpcImplLocalDispatcher(KafkaStreams streams, KeyValueSerde.Registry keyValueSerdeRegistry, FilterPredicate<?, ?> filterPredicate) {
        this.streams = streams;
        this.keyValueSerdes = keyValueSerdeRegistry;
        this.filterPredicate = filterPredicate;
    }

    private <K, V> ExtReadOnlyKeyValueStore<K, V> keyValueStore(String storeName) {
        return (ExtReadOnlyKeyValueStore)this.keyValueStores.computeIfAbsent(storeName, sn -> {
            QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
            StoreQueryParameters sqp = StoreQueryParameters.fromNameAndType((String)storeName, (QueryableStoreType)queryableStoreType);
            return new ExtReadOnlyKeyValueStoreImpl((ReadOnlyKeyValueStore)this.streams.store(sqp), this.filterPredicate);
        });
    }

    @Override
    public void allKeys(VoidReq request, StreamObserver<Key> responseObserver) {
        boolean ok = false;
        try (Stream stream = this.keyValueStore(request.getStoreName()).allKeys();){
            this.drainToKey(request.getStoreName(), stream, responseObserver);
            ok = true;
        }
        catch (Throwable e) {
            responseObserver.onError(e);
        }
        if (ok) {
            responseObserver.onCompleted();
        }
    }

    @Override
    public void filter(FilterReq request, StreamObserver<KeyValue> responseObserver) {
        boolean ok = false;
        try (Stream stream = this.keyValueStore(request.getStoreName()).filter(ProtoUtil.emptyAsNull((String)request.getFilter()), request.getOver());){
            this.drainToKeyValue(request.getStoreName(), stream, responseObserver);
            ok = true;
        }
        catch (Throwable e) {
            responseObserver.onError(e);
        }
        if (ok) {
            responseObserver.onCompleted();
        }
    }

    @Override
    public void get(KeyReq request, StreamObserver<Value> responseObserver) {
        boolean ok = false;
        try {
            Object value = this.keyValueStore(request.getStoreName()).get(this.keyValueSerdes.deserializeKey(request.getStoreName(), request.getKey().toByteArray()));
            byte[] valueBytes = this.keyValueSerdes.serializeVal(request.getStoreName(), value);
            if (valueBytes != null) {
                responseObserver.onNext((Object)Value.newBuilder().setValue(ByteString.copyFrom((byte[])valueBytes)).build());
            }
            ok = true;
        }
        catch (Throwable e) {
            responseObserver.onError(e);
        }
        if (ok) {
            responseObserver.onCompleted();
        }
    }

    @Override
    public void range(KeyFromKeyToReq request, StreamObserver<KeyValue> responseObserver) {
        boolean ok = false;
        try (KeyValueIterator iter = this.keyValueStore(request.getStoreName()).range(this.keyValueSerdes.deserializeKey(request.getStoreName(), request.getKeyFrom().toByteArray()), this.keyValueSerdes.deserializeVal(request.getStoreName(), request.getKeyTo().toByteArray()));){
            this.drainToKeyValue(request.getStoreName(), iter, responseObserver);
            ok = true;
        }
        catch (Throwable e) {
            responseObserver.onError(e);
        }
        if (ok) {
            responseObserver.onCompleted();
        }
    }

    @Override
    public void all(VoidReq request, StreamObserver<KeyValue> responseObserver) {
        boolean ok = false;
        try (KeyValueIterator iter = this.keyValueStore(request.getStoreName()).all();){
            this.drainToKeyValue(request.getStoreName(), iter, responseObserver);
            ok = true;
        }
        catch (Throwable e) {
            responseObserver.onError(e);
        }
        if (ok) {
            responseObserver.onCompleted();
        }
    }

    @Override
    public void approximateNumEntries(VoidReq request, StreamObserver<Size> responseObserver) {
        boolean ok = false;
        try {
            long size = this.keyValueStore(request.getStoreName()).approximateNumEntries();
            responseObserver.onNext((Object)Size.newBuilder().setSize(size).build());
            ok = true;
        }
        catch (Throwable e) {
            responseObserver.onError(e);
        }
        if (ok) {
            responseObserver.onCompleted();
        }
    }

    private <K> void drainToKey(String storeName, Stream<K> stream, StreamObserver<Key> responseObserver) {
        stream.forEach(key -> {
            byte[] keyBytes = this.keyValueSerdes.serializeKey(storeName, key);
            if (keyBytes != null) {
                responseObserver.onNext((Object)Key.newBuilder().setKey(ByteString.copyFrom((byte[])keyBytes)).build());
            }
        });
    }

    private <K, V> void drainToKeyValue(String storeName, Stream<org.apache.kafka.streams.KeyValue<K, V>> stream, StreamObserver<KeyValue> responseObserver) {
        stream.forEach(kv -> this.drainToKeyValue(storeName, (org.apache.kafka.streams.KeyValue)kv, responseObserver));
    }

    private <K, V> void drainToKeyValue(String storeName, KeyValueIterator<K, V> iter, StreamObserver<KeyValue> responseObserver) {
        while (iter.hasNext()) {
            org.apache.kafka.streams.KeyValue wkv = (org.apache.kafka.streams.KeyValue)iter.next();
            this.drainToKeyValue(storeName, wkv, responseObserver);
        }
    }

    private <K, V> void drainToKeyValue(String storeName, org.apache.kafka.streams.KeyValue<K, V> wkv, StreamObserver<KeyValue> responseObserver) {
        byte[] keyBytes = this.keyValueSerdes.serializeKey(storeName, wkv.key);
        byte[] valueBytes = this.keyValueSerdes.serializeVal(storeName, wkv.value);
        if (keyBytes != null && valueBytes != null) {
            responseObserver.onNext((Object)KeyValue.newBuilder().setKey(ByteString.copyFrom((byte[])keyBytes)).setValue(ByteString.copyFrom((byte[])valueBytes)).build());
        }
    }
}

