/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.portability;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.beam.fn.harness.ExternalWorkerService;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.DefaultArtifactResolver;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.runners.portability.CloseableResource;
import org.apache.beam.runners.portability.JobServicePipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PortableRunner
extends PipelineRunner<PipelineResult> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(PortableRunner.class);
    private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
    private final @UnknownKeyFor @NonNull @Initialized String endpoint;
    private final @UnknownKeyFor @NonNull @Initialized ManagedChannelFactory channelFactory;

    public static @UnknownKeyFor @NonNull @Initialized PortableRunner fromOptions(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        return PortableRunner.create(options, ManagedChannelFactory.createDefault());
    }

    @VisibleForTesting
    static @UnknownKeyFor @NonNull @Initialized PortableRunner create(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized ManagedChannelFactory channelFactory) {
        PortablePipelineOptions portableOptions = (PortablePipelineOptions)PipelineOptionsValidator.validate(PortablePipelineOptions.class, (PipelineOptions)options);
        String endpoint = portableOptions.getJobEndpoint();
        return new PortableRunner(options, endpoint, channelFactory);
    }

    private PortableRunner(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized String endpoint, @UnknownKeyFor @NonNull @Initialized ManagedChannelFactory channelFactory) {
        this.options = options;
        this.endpoint = endpoint;
        this.channelFactory = channelFactory;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public @UnknownKeyFor @NonNull @Initialized PipelineResult run(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) {
        Runnable cleanup;
        if ("LOOPBACK".equals(((PortablePipelineOptions)this.options.as(PortablePipelineOptions.class)).getDefaultEnvironmentType())) {
            GrpcFnServer workerService;
            try {
                workerService = new ExternalWorkerService(this.options).start();
            }
            catch (Exception exn) {
                throw new RuntimeException("Failed to start GrpcFnServer for ExternalWorkerService", exn);
            }
            LOG.info("Starting worker service at {}", (Object)workerService.getApiServiceDescriptor().getUrl());
            ((PortablePipelineOptions)this.options.as(PortablePipelineOptions.class)).setDefaultEnvironmentConfig(workerService.getApiServiceDescriptor().getUrl());
            cleanup = () -> {
                try {
                    LOG.warn("closing worker service {}", (Object)workerService);
                    workerService.close();
                }
                catch (Exception exn) {
                    throw new RuntimeException(exn);
                }
            };
        } else {
            cleanup = null;
        }
        ImmutableList.Builder filesToStageBuilder = ImmutableList.builder();
        List stagingFiles = ((PortablePipelineOptions)this.options.as(PortablePipelineOptions.class)).getFilesToStage();
        if (stagingFiles == null) {
            List classpathResources = PipelineResources.detectClassPathResourcesToStage((ClassLoader)Environments.class.getClassLoader(), (PipelineOptions)this.options);
            if (classpathResources.isEmpty()) {
                throw new IllegalArgumentException("No classpath elements found.");
            }
            LOG.debug("PortablePipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: {}", (Object)classpathResources.size());
            filesToStageBuilder.addAll((Iterable)classpathResources);
        } else {
            filesToStageBuilder.addAll((Iterable)stagingFiles);
        }
        List experiments = ((ExperimentalOptions)this.options.as(ExperimentalOptions.class)).getExperiments();
        if (experiments != null) {
            Optional<String> jarPackages = experiments.stream().filter(flag -> flag.startsWith("jar_packages=")).findFirst();
            jarPackages.ifPresent(s -> filesToStageBuilder.addAll(Arrays.asList(s.replaceFirst("jar_packages=", "").split(","))));
        }
        ((PortablePipelineOptions)this.options.as(PortablePipelineOptions.class)).setFilesToStage((List)filesToStageBuilder.build());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)pipeline, (SdkComponents)SdkComponents.create((PipelineOptions)this.options));
        pipelineProto = DefaultArtifactResolver.INSTANCE.resolveArtifacts(pipelineProto);
        JobApi.PrepareJobRequest prepareJobRequest = JobApi.PrepareJobRequest.newBuilder().setJobName(this.options.getJobName()).setPipeline(pipelineProto).setPipelineOptions(PipelineOptionsTranslation.toProto((PipelineOptions)this.options)).build();
        LOG.info("Using job server endpoint: {}", (Object)this.endpoint);
        ManagedChannel jobServiceChannel = this.channelFactory.forDescriptor(Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.endpoint).build());
        JobServiceGrpc.JobServiceBlockingStub jobService = JobServiceGrpc.newBlockingStub((Channel)jobServiceChannel);
        try (CloseableResource<JobServiceGrpc.JobServiceBlockingStub> wrappedJobService = CloseableResource.of(jobService, unused -> jobServiceChannel.shutdown());){
            int jobServerTimeout = ((PortablePipelineOptions)this.options.as(PortablePipelineOptions.class)).getJobServerTimeout();
            JobApi.PrepareJobResponse prepareJobResponse = ((JobServiceGrpc.JobServiceBlockingStub)((JobServiceGrpc.JobServiceBlockingStub)jobService.withDeadlineAfter((long)jobServerTimeout, TimeUnit.SECONDS)).withWaitForReady()).prepare(prepareJobRequest);
            LOG.info("PrepareJobResponse: {}", (Object)prepareJobResponse);
            Endpoints.ApiServiceDescriptor artifactStagingEndpoint = prepareJobResponse.getArtifactStagingEndpoint();
            String stagingSessionToken = prepareJobResponse.getStagingSessionToken();
            try (CloseableResource<ManagedChannel> artifactChannel = CloseableResource.of(this.channelFactory.forDescriptor(artifactStagingEndpoint), ManagedChannel::shutdown);){
                ArtifactStagingService.offer((ArtifactRetrievalService)new ArtifactRetrievalService(), (ArtifactStagingServiceGrpc.ArtifactStagingServiceStub)ArtifactStagingServiceGrpc.newStub((Channel)((Channel)artifactChannel.get())), (String)stagingSessionToken);
            }
            catch (CloseableResource.CloseException e) {
                LOG.warn("Error closing artifact staging channel", (Throwable)e);
            }
            catch (Exception e) {
                throw new RuntimeException("Error staging files.", e);
            }
            JobApi.RunJobRequest runJobRequest = JobApi.RunJobRequest.newBuilder().setPreparationId(prepareJobResponse.getPreparationId()).build();
            JobApi.RunJobResponse runJobResponse = jobService.run(runJobRequest);
            LOG.info("RunJobResponse: {}", (Object)runJobResponse);
            ByteString jobId = runJobResponse.getJobIdBytes();
            JobServicePipelineResult jobServicePipelineResult = new JobServicePipelineResult(jobId, jobServerTimeout, wrappedJobService.transfer(), cleanup);
            return jobServicePipelineResult;
        }
        catch (CloseableResource.CloseException e) {
            throw new RuntimeException(e);
        }
    }

    @SideEffectFree
    public @UnknownKeyFor @NonNull @Initialized String toString() {
        return "PortableRunner#" + ((Object)((Object)this)).hashCode();
    }
}

