/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineValidator;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission.JobPreparation;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.function.ThrowingConsumer;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.SynchronizedStreamObserver;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryJobService
extends JobServiceGrpc.JobServiceImplBase
implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobService.class);
    private final ConcurrentMap<String, JobPreparation> preparations;
    private final ConcurrentMap<String, JobInvocation> invocations;
    private final ConcurrentMap<String, String> stagingSessionTokens;
    private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
    private final Function<String, String> stagingServiceTokenProvider;
    private final ThrowingConsumer<String> cleanupJobFn;
    private final JobInvoker invoker;

    public static InMemoryJobService create(Endpoints.ApiServiceDescriptor stagingServiceDescriptor, Function<String, String> stagingServiceTokenProvider, ThrowingConsumer<String> cleanupJobFn, JobInvoker invoker) {
        return new InMemoryJobService(stagingServiceDescriptor, stagingServiceTokenProvider, cleanupJobFn, invoker);
    }

    private InMemoryJobService(Endpoints.ApiServiceDescriptor stagingServiceDescriptor, Function<String, String> stagingServiceTokenProvider, ThrowingConsumer<String> cleanupJobFn, JobInvoker invoker) {
        this.stagingServiceDescriptor = stagingServiceDescriptor;
        this.stagingServiceTokenProvider = stagingServiceTokenProvider;
        this.cleanupJobFn = cleanupJobFn;
        this.invoker = invoker;
        this.preparations = new ConcurrentHashMap<String, JobPreparation>();
        this.invocations = new ConcurrentHashMap<String, JobInvocation>();
        this.stagingSessionTokens = new ConcurrentHashMap<String, String>();
    }

    public void prepare(JobApi.PrepareJobRequest request, StreamObserver<JobApi.PrepareJobResponse> responseObserver) {
        try {
            LOG.trace("{} {}", (Object)JobApi.PrepareJobRequest.class.getSimpleName(), (Object)request);
            String preparationId = String.format("%s_%s", request.getJobName(), UUID.randomUUID().toString());
            Struct pipelineOptions = request.getPipelineOptions();
            if (pipelineOptions == null) {
                throw new NullPointerException("Encountered null pipeline options.");
            }
            LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), (Object)pipelineOptions);
            JobPreparation preparation = JobPreparation.builder().setId(preparationId).setPipeline(request.getPipeline()).setOptions(pipelineOptions).build();
            JobPreparation previous = this.preparations.putIfAbsent(preparationId, preparation);
            if (previous != null) {
                String errMessage = String.format("A job with the preparation ID \"%s\" already exists.", preparationId);
                StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException();
                responseObserver.onError((Throwable)exception);
                return;
            }
            String stagingSessionToken = this.stagingServiceTokenProvider.apply(preparationId);
            this.stagingSessionTokens.putIfAbsent(preparationId, stagingSessionToken);
            JobApi.PrepareJobResponse response = JobApi.PrepareJobResponse.newBuilder().setPreparationId(preparationId).setArtifactStagingEndpoint(this.stagingServiceDescriptor).setStagingSessionToken(stagingSessionToken).build();
            responseObserver.onNext((Object)response);
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            LOG.error("Could not prepare job with name {}", (Object)request.getJobName(), (Object)e);
            responseObserver.onError((Throwable)Status.INTERNAL.withCause((Throwable)e).asException());
        }
    }

    public void run(JobApi.RunJobRequest request, StreamObserver<JobApi.RunJobResponse> responseObserver) {
        LOG.trace("{} {}", (Object)JobApi.RunJobRequest.class.getSimpleName(), (Object)request);
        String preparationId = request.getPreparationId();
        try {
            JobPreparation preparation = (JobPreparation)this.preparations.get(preparationId);
            if (preparation == null) {
                String errMessage = String.format("Unknown Preparation Id \"%s\".", preparationId);
                StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException();
                responseObserver.onError((Throwable)exception);
                return;
            }
            try {
                PipelineValidator.validate(preparation.pipeline());
            }
            catch (Exception e) {
                LOG.warn("Encountered Unexpected Exception during validation", (Throwable)e);
                responseObserver.onError((Throwable)new StatusRuntimeException(Status.INVALID_ARGUMENT.withCause((Throwable)e)));
                return;
            }
            JobInvocation invocation = this.invoker.invoke(preparation.pipeline(), preparation.options(), request.getRetrievalToken());
            String invocationId = invocation.getId();
            invocation.addStateListener(state -> {
                if (!JobInvocation.isTerminated(state).booleanValue()) {
                    return;
                }
                String stagingSessionToken = (String)this.stagingSessionTokens.get(preparationId);
                this.stagingSessionTokens.remove(preparationId);
                if (this.cleanupJobFn != null) {
                    try {
                        this.cleanupJobFn.accept(stagingSessionToken);
                    }
                    catch (Exception e) {
                        LOG.error("Failed to remove job staging directory for token {}: {}", (Object)stagingSessionToken, (Object)e);
                    }
                }
            });
            invocation.start();
            this.invocations.put(invocationId, invocation);
            JobApi.RunJobResponse response = JobApi.RunJobResponse.newBuilder().setJobId(invocationId).build();
            responseObserver.onNext((Object)response);
            responseObserver.onCompleted();
        }
        catch (StatusRuntimeException e) {
            LOG.warn("Encountered Status Exception", (Throwable)e);
            responseObserver.onError((Throwable)e);
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Preparation %s", preparationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError((Throwable)Status.INTERNAL.withCause((Throwable)e).asException());
        }
    }

    public void getState(JobApi.GetJobStateRequest request, StreamObserver<JobApi.GetJobStateResponse> responseObserver) {
        LOG.trace("{} {}", (Object)JobApi.GetJobStateRequest.class.getSimpleName(), (Object)request);
        String invocationId = request.getJobId();
        try {
            JobInvocation invocation = this.getInvocation(invocationId);
            JobApi.JobState.Enum state = invocation.getState();
            JobApi.GetJobStateResponse response = JobApi.GetJobStateResponse.newBuilder().setState(state).build();
            responseObserver.onNext((Object)response);
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError((Throwable)Status.INTERNAL.withCause((Throwable)e).asException());
        }
    }

    public void cancel(JobApi.CancelJobRequest request, StreamObserver<JobApi.CancelJobResponse> responseObserver) {
        LOG.trace("{} {}", (Object)JobApi.CancelJobRequest.class.getSimpleName(), (Object)request);
        String invocationId = request.getJobId();
        try {
            JobInvocation invocation = this.getInvocation(invocationId);
            invocation.cancel();
            JobApi.JobState.Enum state = invocation.getState();
            JobApi.CancelJobResponse response = JobApi.CancelJobResponse.newBuilder().setState(state).build();
            responseObserver.onNext((Object)response);
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError((Throwable)Status.INTERNAL.withCause((Throwable)e).asException());
        }
    }

    public void getStateStream(JobApi.GetJobStateRequest request, StreamObserver<JobApi.GetJobStateResponse> responseObserver) {
        LOG.trace("{} {}", (Object)JobApi.GetJobStateRequest.class.getSimpleName(), (Object)request);
        String invocationId = request.getJobId();
        try {
            JobInvocation invocation = this.getInvocation(invocationId);
            Consumer<JobApi.JobState.Enum> stateListener = state -> {
                responseObserver.onNext((Object)JobApi.GetJobStateResponse.newBuilder().setState(state).build());
                if (JobInvocation.isTerminated(state).booleanValue()) {
                    responseObserver.onCompleted();
                }
            };
            invocation.addStateListener(stateListener);
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError((Throwable)Status.INTERNAL.withCause((Throwable)e).asException());
        }
    }

    public void getMessageStream(JobApi.JobMessagesRequest request, StreamObserver<JobApi.JobMessagesResponse> responseObserver) {
        String invocationId = request.getJobId();
        try {
            JobInvocation invocation = this.getInvocation(invocationId);
            StreamObserver<JobApi.JobMessagesResponse> syncResponseObserver = SynchronizedStreamObserver.wrapping(responseObserver);
            Consumer<JobApi.JobState.Enum> stateListener = state -> {
                syncResponseObserver.onNext((Object)JobApi.JobMessagesResponse.newBuilder().setStateResponse(JobApi.GetJobStateResponse.newBuilder().setState(state).build()).build());
                if (JobInvocation.isTerminated(state).booleanValue()) {
                    responseObserver.onCompleted();
                }
            };
            Consumer<JobApi.JobMessage> messageListener = message -> syncResponseObserver.onNext((Object)JobApi.JobMessagesResponse.newBuilder().setMessageResponse(message).build());
            invocation.addStateListener(stateListener);
            invocation.addMessageListener(messageListener);
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError((Throwable)Status.INTERNAL.withCause((Throwable)e).asException());
        }
    }

    public void describePipelineOptions(JobApi.DescribePipelineOptionsRequest request, StreamObserver<JobApi.DescribePipelineOptionsResponse> responseObserver) {
        LOG.trace("{} {}", (Object)JobApi.DescribePipelineOptionsRequest.class.getSimpleName(), (Object)request);
        try {
            JobApi.DescribePipelineOptionsResponse response = JobApi.DescribePipelineOptionsResponse.newBuilder().addAllOptions((Iterable)PipelineOptionsFactory.describe((Set)PipelineOptionsFactory.getRegisteredOptions())).build();
            responseObserver.onNext((Object)response);
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            LOG.error("Error describing pipeline options", (Throwable)e);
            responseObserver.onError((Throwable)Status.INTERNAL.withCause((Throwable)e).asException());
        }
    }

    @Override
    public void close() throws Exception {
    }

    private JobInvocation getInvocation(String invocationId) throws StatusException {
        JobInvocation invocation = (JobInvocation)this.invocations.get(invocationId);
        if (invocation == null) {
            throw Status.NOT_FOUND.asException();
        }
        return invocation;
    }
}

