/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnStateGrpcClientCache {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnStateGrpcClientCache.class);
    private final ConcurrentMap<Endpoints.ApiServiceDescriptor, BeamFnStateClient> cache;
    private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory;
    private final OutboundObserverFactory outboundObserverFactory;
    private final IdGenerator idGenerator;

    public BeamFnStateGrpcClientCache(IdGenerator idGenerator, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, OutboundObserverFactory outboundObserverFactory) {
        this.idGenerator = idGenerator;
        this.channelFactory = channelFactory;
        this.outboundObserverFactory = outboundObserverFactory;
        this.cache = new ConcurrentHashMap<Endpoints.ApiServiceDescriptor, BeamFnStateClient>();
    }

    public BeamFnStateClient forApiServiceDescriptor(Endpoints.ApiServiceDescriptor apiServiceDescriptor) throws IOException {
        return this.cache.computeIfAbsent(apiServiceDescriptor, this::createBeamFnStateClient);
    }

    private BeamFnStateClient createBeamFnStateClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
        return new GrpcStateClient(apiServiceDescriptor);
    }

    private class GrpcStateClient
    implements BeamFnStateClient {
        private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
        private final ConcurrentMap<String, CompletableFuture<BeamFnApi.StateResponse>> outstandingRequests;
        private final StreamObserver<BeamFnApi.StateRequest> outboundObserver;
        private final ManagedChannel channel;
        private volatile RuntimeException closed;

        private GrpcStateClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
            this.apiServiceDescriptor = apiServiceDescriptor;
            this.outstandingRequests = new ConcurrentHashMap<String, CompletableFuture<BeamFnApi.StateResponse>>();
            this.channel = (ManagedChannel)BeamFnStateGrpcClientCache.this.channelFactory.apply(apiServiceDescriptor);
            this.outboundObserver = BeamFnStateGrpcClientCache.this.outboundObserverFactory.outboundObserverFor(BeamFnStateGrpc.newStub(this.channel)::state, new InboundObserver());
        }

        @Override
        public CompletableFuture<BeamFnApi.StateResponse> handle(BeamFnApi.StateRequest.Builder requestBuilder) {
            requestBuilder.setId(BeamFnStateGrpcClientCache.this.idGenerator.getId());
            BeamFnApi.StateRequest request = requestBuilder.build();
            CompletableFuture<BeamFnApi.StateResponse> response = new CompletableFuture<BeamFnApi.StateResponse>();
            this.outstandingRequests.put(request.getId(), response);
            LOG.debug("Sending StateRequest {}", (Object)request);
            this.outboundObserver.onNext(request);
            return response;
        }

        private synchronized void closeAndCleanUp(RuntimeException cause) {
            if (this.closed != null) {
                return;
            }
            BeamFnStateGrpcClientCache.this.cache.remove(this.apiServiceDescriptor);
            this.closed = cause;
            ConcurrentHashMap<String, CompletableFuture<BeamFnApi.StateResponse>> outstandingRequestsCopy = new ConcurrentHashMap<String, CompletableFuture<BeamFnApi.StateResponse>>(this.outstandingRequests);
            if (outstandingRequestsCopy.isEmpty()) {
                this.outboundObserver.onCompleted();
                return;
            }
            this.outstandingRequests.clear();
            LOG.error("BeamFnState failed, clearing outstanding requests {}", (Object)outstandingRequestsCopy);
            for (CompletableFuture entry : outstandingRequestsCopy.values()) {
                entry.completeExceptionally(cause);
            }
        }

        private class InboundObserver
        implements StreamObserver<BeamFnApi.StateResponse> {
            private InboundObserver() {
            }

            @Override
            public void onNext(BeamFnApi.StateResponse value) {
                LOG.debug("Received StateResponse {}", (Object)value);
                CompletableFuture responseFuture = (CompletableFuture)GrpcStateClient.this.outstandingRequests.remove(value.getId());
                if (responseFuture == null) {
                    LOG.warn("Dropped unknown StateResponse {}", (Object)value);
                    return;
                }
                if (value.getError().isEmpty()) {
                    responseFuture.complete(value);
                } else {
                    responseFuture.completeExceptionally(new IllegalStateException(value.getError()));
                }
            }

            @Override
            public void onError(Throwable t) {
                GrpcStateClient.this.closeAndCleanUp(t instanceof RuntimeException ? (RuntimeException)t : new RuntimeException(t));
            }

            @Override
            public void onCompleted() {
                GrpcStateClient.this.closeAndCleanUp(new RuntimeException("Server hanged up."));
            }
        }
    }
}

