/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.protobuf.Struct;
import org.apache.beam.repackaged.beam_runners_direct_java.io.grpc.Status;
import org.apache.beam.repackaged.beam_runners_direct_java.io.grpc.StatusException;
import org.apache.beam.repackaged.beam_runners_direct_java.io.grpc.StatusRuntimeException;
import org.apache.beam.repackaged.beam_runners_direct_java.io.grpc.stub.StreamObserver;
import org.apache.beam.repackaged.beam_runners_direct_java.model.jobmanagement.v1.JobApi;
import org.apache.beam.repackaged.beam_runners_direct_java.model.jobmanagement.v1.JobServiceGrpc;
import org.apache.beam.repackaged.beam_runners_direct_java.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.jobsubmission.JobPreparation;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.SynchronizedStreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryJobService
extends JobServiceGrpc.JobServiceImplBase
implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobService.class);
    private final ConcurrentMap<String, JobPreparation> preparations;
    private final ConcurrentMap<String, JobInvocation> invocations;
    private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
    private final JobInvoker invoker;

    public static InMemoryJobService create(Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) {
        return new InMemoryJobService(stagingServiceDescriptor, invoker);
    }

    private InMemoryJobService(Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) {
        this.stagingServiceDescriptor = stagingServiceDescriptor;
        this.invoker = invoker;
        this.preparations = new ConcurrentHashMap<String, JobPreparation>();
        this.invocations = new ConcurrentHashMap<String, JobInvocation>();
    }

    public void prepare(JobApi.PrepareJobRequest request, StreamObserver<JobApi.PrepareJobResponse> responseObserver) {
        try {
            LOG.trace("{} {}", (Object)JobApi.PrepareJobRequest.class.getSimpleName(), (Object)request);
            String preparationId = String.format("%s_%s", request.getJobName(), UUID.randomUUID().toString());
            Struct pipelineOptions = request.getPipelineOptions();
            if (pipelineOptions == null) {
                throw new NullPointerException("Encountered null pipeline options.");
            }
            LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), (Object)pipelineOptions);
            JobPreparation preparation = JobPreparation.builder().setId(preparationId).setPipeline(request.getPipeline()).setOptions(pipelineOptions).build();
            JobPreparation previous = this.preparations.putIfAbsent(preparationId, preparation);
            if (previous != null) {
                String errMessage = String.format("A job with the preparation ID \"%s\" already exists.", preparationId);
                StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException();
                responseObserver.onError(exception);
                return;
            }
            JobApi.PrepareJobResponse response = JobApi.PrepareJobResponse.newBuilder().setPreparationId(preparationId).setArtifactStagingEndpoint(this.stagingServiceDescriptor).setStagingSessionToken("token").build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            LOG.error("Could not prepare job with name {}", (Object)request.getJobName(), (Object)e);
            responseObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    public void run(JobApi.RunJobRequest request, StreamObserver<JobApi.RunJobResponse> responseObserver) {
        LOG.trace("{} {}", (Object)JobApi.RunJobRequest.class.getSimpleName(), (Object)request);
        String preparationId = request.getPreparationId();
        try {
            JobPreparation preparation = (JobPreparation)this.preparations.get(preparationId);
            if (preparation == null) {
                String errMessage = String.format("Unknown Preparation Id \"%s\".", preparationId);
                StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException();
                responseObserver.onError(exception);
                return;
            }
            JobInvocation invocation = this.invoker.invoke(preparation.pipeline(), preparation.options(), request.getStagingToken());
            String invocationId = invocation.getId();
            invocation.start();
            this.invocations.put(invocationId, invocation);
            JobApi.RunJobResponse response = JobApi.RunJobResponse.newBuilder().setJobId(invocationId).build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
        catch (StatusRuntimeException e) {
            LOG.warn("Encountered Status Exception", (Throwable)e);
            responseObserver.onError(e);
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Preparation %s", preparationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    public void getState(JobApi.GetJobStateRequest request, StreamObserver<JobApi.GetJobStateResponse> responseObserver) {
        LOG.trace("{} {}", (Object)JobApi.GetJobStateRequest.class.getSimpleName(), (Object)request);
        String invocationId = request.getJobId();
        try {
            JobInvocation invocation = this.getInvocation(invocationId);
            JobApi.JobState.Enum state = invocation.getState();
            JobApi.GetJobStateResponse response = JobApi.GetJobStateResponse.newBuilder().setState(state).build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    public void cancel(JobApi.CancelJobRequest request, StreamObserver<JobApi.CancelJobResponse> responseObserver) {
        LOG.trace("{} {}", (Object)JobApi.CancelJobRequest.class.getSimpleName(), (Object)request);
        String invocationId = request.getJobId();
        try {
            JobInvocation invocation = this.getInvocation(invocationId);
            invocation.cancel();
            JobApi.JobState.Enum state = invocation.getState();
            JobApi.CancelJobResponse response = JobApi.CancelJobResponse.newBuilder().setState(state).build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    public void getStateStream(JobApi.GetJobStateRequest request, StreamObserver<JobApi.GetJobStateResponse> responseObserver) {
        LOG.trace("{} {}", (Object)JobApi.GetJobStateRequest.class.getSimpleName(), (Object)request);
        String invocationId = request.getJobId();
        try {
            JobInvocation invocation = this.getInvocation(invocationId);
            Function<JobApi.JobState.Enum, JobApi.GetJobStateResponse> responseFunction = state -> JobApi.GetJobStateResponse.newBuilder().setState(state).build();
            Consumer<JobApi.JobState.Enum> stateListener = state -> responseObserver.onNext((JobApi.GetJobStateResponse)responseFunction.apply((JobApi.JobState.Enum)state));
            invocation.addStateListener(stateListener);
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    public void getMessageStream(JobApi.JobMessagesRequest request, StreamObserver<JobApi.JobMessagesResponse> responseObserver) {
        String invocationId = request.getJobId();
        try {
            JobInvocation invocation = this.getInvocation(invocationId);
            StreamObserver<JobApi.JobMessagesResponse> syncResponseObserver = SynchronizedStreamObserver.wrapping(responseObserver);
            Consumer<JobApi.JobState.Enum> stateListener = state -> syncResponseObserver.onNext(JobApi.JobMessagesResponse.newBuilder().setStateResponse(JobApi.GetJobStateResponse.newBuilder().setState(state).build()).build());
            Consumer<JobApi.JobMessage> messageListener = message -> syncResponseObserver.onNext(JobApi.JobMessagesResponse.newBuilder().setMessageResponse(message).build());
            invocation.addStateListener(stateListener);
            invocation.addMessageListener(messageListener);
        }
        catch (Exception e) {
            String errMessage = String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
            LOG.error(errMessage, (Throwable)e);
            responseObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    @Override
    public void close() throws Exception {
    }

    private JobInvocation getInvocation(String invocationId) throws StatusException {
        JobInvocation invocation = (JobInvocation)this.invocations.get(invocationId);
        if (invocation == null) {
            throw Status.NOT_FOUND.asException();
        }
        return invocation;
    }
}

