/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import java.util.EnumMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnControlClient {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnControlClient.class);
    private final StreamObserver<BeamFnApi.InstructionResponse> outboundObserver;
    private final EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> handlers;
    private final CompletableFuture<Object> onFinish;
    private static final Object COMPLETED = new Object();

    public BeamFnControlClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor, ManagedChannelFactory channelFactory, OutboundObserverFactory outboundObserverFactory, Executor executor, EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> handlers) {
        this(BeamFnControlGrpc.newStub(channelFactory.forDescriptor(apiServiceDescriptor)), outboundObserverFactory, executor, handlers);
    }

    public BeamFnControlClient(BeamFnControlGrpc.BeamFnControlStub controlStub, OutboundObserverFactory outboundObserverFactory, Executor executor, EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> handlers) {
        this.outboundObserver = outboundObserverFactory.outboundObserverFor(controlStub::control, new InboundObserver(executor));
        this.handlers = handlers;
        this.onFinish = new CompletableFuture();
    }

    public void waitForTermination() throws InterruptedException, ExecutionException {
        this.onFinish.get();
    }

    public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(BeamFnApi.InstructionRequest value) {
        try {
            return this.handlers.getOrDefault(value.getRequestCase(), this::missingHandler).apply(value).setInstructionId(value.getInstructionId()).build();
        }
        catch (Exception e) {
            LOG.error("Exception while trying to handle {} {}", BeamFnApi.InstructionRequest.class.getSimpleName(), value.getInstructionId(), e);
            return BeamFnApi.InstructionResponse.newBuilder().setInstructionId(value.getInstructionId()).setError(Throwables.getStackTraceAsString(e)).build();
        }
        catch (Error e) {
            LOG.error("Error thrown when handling {} {}", BeamFnApi.InstructionRequest.class.getSimpleName(), value.getInstructionId(), e);
            throw e;
        }
    }

    public void sendInstructionResponse(BeamFnApi.InstructionResponse value) {
        LOG.debug("Sending InstructionResponse {}", (Object)value);
        this.outboundObserver.onNext(value);
    }

    private void sendErrorResponse(Error e) {
        this.onFinish.completeExceptionally(e);
        this.outboundObserver.onError(Status.INTERNAL.withDescription(String.format("%s: %s", e.getClass().getName(), e.getMessage())).asException());
    }

    private BeamFnApi.InstructionResponse.Builder missingHandler(BeamFnApi.InstructionRequest request) {
        return BeamFnApi.InstructionResponse.newBuilder().setError(String.format("Unknown InstructionRequest type %s", request.getRequestCase()));
    }

    private class InboundObserver
    implements StreamObserver<BeamFnApi.InstructionRequest> {
        private final Executor executor;

        InboundObserver(Executor executorService) {
            this.executor = executorService;
        }

        @Override
        public void onNext(BeamFnApi.InstructionRequest request) {
            try {
                BeamFnLoggingMDC.setInstructionId(request.getInstructionId());
                LOG.debug("Received InstructionRequest {}", (Object)request);
                this.executor.execute(() -> {
                    try {
                        BeamFnLoggingMDC.setInstructionId(request.getInstructionId());
                        BeamFnApi.InstructionResponse response = BeamFnControlClient.this.delegateOnInstructionRequestType(request);
                        BeamFnControlClient.this.sendInstructionResponse(response);
                    }
                    catch (Error e) {
                        BeamFnControlClient.this.sendErrorResponse(e);
                        throw e;
                    }
                    finally {
                        BeamFnLoggingMDC.setInstructionId(null);
                    }
                });
            }
            finally {
                BeamFnLoggingMDC.setInstructionId(null);
            }
        }

        @Override
        public void onError(Throwable t) {
            BeamFnControlClient.this.onFinish.completeExceptionally(t);
        }

        @Override
        public void onCompleted() {
            BeamFnControlClient.this.onFinish.complete(COMPLETED);
        }
    }
}

