/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.streams.diservice;

import com.google.protobuf.ByteString;
import io.apicurio.registry.utils.streams.diservice.AsyncBiFunctionService;
import io.apicurio.registry.utils.streams.diservice.LocalService;
import io.apicurio.registry.utils.streams.diservice.proto.AsyncBiFunctionServiceGrpc;
import io.apicurio.registry.utils.streams.diservice.proto.BiFunctionReq;
import io.apicurio.registry.utils.streams.diservice.proto.BiFunctionRes;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.serialization.Serializer;

public class AsyncBiFunctionServiceGrpcLocalDispatcher
extends AsyncBiFunctionServiceGrpc.AsyncBiFunctionServiceImplBase {
    private final LocalService.Registry<? extends AsyncBiFunctionService.WithSerdes<?, ?, ?>> localServiceRegistry;

    public AsyncBiFunctionServiceGrpcLocalDispatcher(Collection<LocalService<? extends AsyncBiFunctionService.WithSerdes<?, ?, ?>>> localAsyncBiFunctionServices) {
        this.localServiceRegistry = new LocalService.Registry(localAsyncBiFunctionServices);
    }

    @Override
    public void apply(BiFunctionReq request, StreamObserver<BiFunctionRes> responseObserver) {
        AsyncBiFunctionService.WithSerdes<?, ?, ?> localService;
        String serviceName = request.getServiceName();
        try {
            AsyncBiFunctionService.WithSerdes<?, ?, ?> _abfs;
            localService = _abfs = this.localServiceRegistry.get(serviceName);
        }
        catch (Exception e) {
            responseObserver.onError((Throwable)e);
            return;
        }
        Object key = localService.keySerde().deserializer().deserialize(serviceName, request.getKey().isEmpty() ? null : request.getKey().toByteArray());
        Object req = localService.reqSerde().deserializer().deserialize(serviceName, request.getReq().isEmpty() ? null : request.getReq().toByteArray());
        Serializer resSerializer = localService.resSerde().serializer();
        try {
            ((CompletionStage)localService.apply(key, req)).whenComplete((res, serviceExc) -> {
                if (serviceExc != null) {
                    responseObserver.onError(serviceExc);
                } else {
                    BiFunctionRes resProto = null;
                    try {
                        byte[] resBytes = resSerializer.serialize(serviceName, res);
                        resProto = BiFunctionRes.newBuilder().setRes(resBytes == null ? ByteString.EMPTY : ByteString.copyFrom((byte[])resBytes)).build();
                    }
                    catch (Throwable serializeExc) {
                        responseObserver.onError(serializeExc);
                    }
                    if (resProto != null) {
                        responseObserver.onNext(resProto);
                        responseObserver.onCompleted();
                    }
                }
            });
        }
        catch (Throwable applyExc) {
            responseObserver.onError(applyExc);
        }
    }
}

