/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.data;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ManagedChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataGrpcClient
implements BeamFnDataClient {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
    private final ConcurrentMap<Endpoints.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer> cache;
    private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory;
    private final OutboundObserverFactory outboundObserverFactory;
    private final PipelineOptions options;

    public BeamFnDataGrpcClient(PipelineOptions options, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, OutboundObserverFactory outboundObserverFactory) {
        this.options = options;
        this.channelFactory = channelFactory;
        this.outboundObserverFactory = outboundObserverFactory;
        this.cache = new ConcurrentHashMap<Endpoints.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer>();
    }

    @Override
    public InboundDataClient receive(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint inputLocation, FnDataReceiver<ByteString> consumer) {
        LOG.debug("Registering consumer for {}", (Object)inputLocation);
        BeamFnDataGrpcMultiplexer client = this.getClientFor(apiServiceDescriptor);
        BeamFnDataInboundObserver inboundObserver = BeamFnDataInboundObserver.forConsumer(inputLocation, consumer);
        client.registerConsumer(inputLocation, inboundObserver);
        return inboundObserver;
    }

    @Override
    public <T> CloseableFnDataReceiver<T> send(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint outputLocation, Coder<T> coder) {
        BeamFnDataGrpcMultiplexer client = this.getClientFor(apiServiceDescriptor);
        LOG.debug("Creating output consumer for {}", (Object)outputLocation);
        return BeamFnDataBufferingOutboundObserver.forLocation(this.options, outputLocation, coder, client.getOutboundObserver());
    }

    private BeamFnDataGrpcMultiplexer getClientFor(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
        return this.cache.computeIfAbsent(apiServiceDescriptor, descriptor -> new BeamFnDataGrpcMultiplexer((Endpoints.ApiServiceDescriptor)descriptor, this.outboundObserverFactory, BeamFnDataGrpc.newStub(this.channelFactory.apply(apiServiceDescriptor))::data));
    }
}

