/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core.fn;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FnApiControlClient
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class);
    private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver;
    private final ResponseStreamObserver responseObserver = new ResponseStreamObserver();
    private final Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequests;
    private volatile boolean isClosed;

    private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> requestReceiver) {
        this.requestReceiver = requestReceiver;
        this.outstandingRequests = new ConcurrentHashMap<String, SettableFuture<BeamFnApi.InstructionResponse>>();
    }

    public static FnApiControlClient forRequestObserver(StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
        return new FnApiControlClient(requestObserver);
    }

    public synchronized ListenableFuture<BeamFnApi.InstructionResponse> handle(BeamFnApi.InstructionRequest request) {
        LOG.debug("Sending InstructionRequest {}", (Object)request);
        SettableFuture<BeamFnApi.InstructionResponse> resultFuture = SettableFuture.create();
        this.outstandingRequests.put(request.getInstructionId(), resultFuture);
        this.requestReceiver.onNext((Object)request);
        return resultFuture;
    }

    StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() {
        return this.responseObserver;
    }

    @Override
    public void close() {
        this.closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection"));
    }

    private synchronized void closeAndTerminateOutstandingRequests(Throwable cause) {
        if (this.isClosed) {
            return;
        }
        ConcurrentHashMap<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy = new ConcurrentHashMap<String, SettableFuture<BeamFnApi.InstructionResponse>>(this.outstandingRequests);
        this.outstandingRequests.clear();
        this.isClosed = true;
        if (outstandingRequestsCopy.isEmpty()) {
            this.requestReceiver.onCompleted();
            return;
        }
        this.requestReceiver.onError((Throwable)new StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
        LOG.error("{} closed, clearing outstanding requests {}", (Object)FnApiControlClient.class.getSimpleName(), outstandingRequestsCopy);
        for (SettableFuture outstandingRequest : outstandingRequestsCopy.values()) {
            outstandingRequest.setException(cause);
        }
    }

    private class ResponseStreamObserver
    implements StreamObserver<BeamFnApi.InstructionResponse> {
        private ResponseStreamObserver() {
        }

        public void onNext(BeamFnApi.InstructionResponse response) {
            LOG.debug("Received InstructionResponse {}", (Object)response);
            SettableFuture completableFuture = (SettableFuture)FnApiControlClient.this.outstandingRequests.remove(response.getInstructionId());
            if (completableFuture != null) {
                completableFuture.set(response);
            }
        }

        public void onCompleted() {
            FnApiControlClient.this.closeAndTerminateOutstandingRequests(new IllegalStateException("SDK harness closed connection"));
        }

        public void onError(Throwable cause) {
            LOG.error("{} received error {}", (Object)FnApiControlClient.class.getSimpleName(), (Object)cause);
            FnApiControlClient.this.closeAndTerminateOutstandingRequests(cause);
        }
    }
}

