/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.streams.diservice;

import io.apicurio.registry.utils.streams.diservice.AsyncBiFunctionService;
import io.apicurio.registry.utils.streams.diservice.AsyncBiFunctionServiceGrpcClient;
import io.apicurio.registry.utils.streams.diservice.DistributedService;
import io.apicurio.registry.utils.streams.diservice.LocalService;
import io.grpc.Channel;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.state.HostInfo;

public class DistributedAsyncBiFunctionService<K, REQ, RES>
extends DistributedService<K, AsyncBiFunctionService<K, REQ, RES>>
implements AsyncBiFunctionService<K, REQ, RES> {
    private final String serviceName;
    private final Serde<REQ> reqSerde;
    private final Serde<RES> resSerde;
    private final AsyncBiFunctionService<K, REQ, RES> localService;

    public DistributedAsyncBiFunctionService(KafkaStreams streams, HostInfo localApplicationServer, String storeName, LocalService<? extends AsyncBiFunctionService.WithSerdes<K, REQ, RES>> localService, Function<? super HostInfo, ? extends Channel> grpcChannelProvider) {
        super(streams, localApplicationServer, storeName, localService.getService().keySerde(), grpcChannelProvider, false);
        this.serviceName = localService.getServiceName();
        this.reqSerde = localService.getService().reqSerde();
        this.resSerde = localService.getService().resSerde();
        this.localService = localService.getService();
    }

    @Override
    public CompletionStage<RES> apply(K key, REQ req) {
        return (CompletionStage)((AsyncBiFunctionService)this.serviceForKey(key)).apply(key, req);
    }

    @Override
    public Stream<CompletionStage<RES>> applyForStore() {
        return this.allServicesForStoreStream().flatMap(AsyncBiFunctionService::applyForStore);
    }

    @Override
    public Stream<CompletionStage<RES>> apply() {
        return this.allServicesStream().flatMap(AsyncBiFunctionService::apply);
    }

    @Override
    protected AsyncBiFunctionService<K, REQ, RES> localService(String storeName, KafkaStreams streams) {
        return this.localService;
    }

    @Override
    protected AsyncBiFunctionService<K, REQ, RES> remoteServiceGrpcClient(String storeName, Channel channel, Serde<K> keySerde) {
        return new AsyncBiFunctionServiceGrpcClient<K, REQ, RES>(this.serviceName, channel, keySerde, this.reqSerde, this.resSerde);
    }
}

