/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable;

import java.io.File;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.protobuf.Struct;
import org.apache.beam.repackaged.beam_runners_direct_java.model.fnexecution.v1.ProvisionApi;
import org.apache.beam.repackaged.beam_runners_direct_java.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ModelCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SyntheticComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.ServerFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.DockerEnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.InProcessEnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerators;
import org.apache.beam.runners.direct.portable.EvaluationContext;
import org.apache.beam.runners.direct.portable.EvaluationContextStepStateAndTimersProvider;
import org.apache.beam.runners.direct.portable.ExecutorServiceParallelExecutor;
import org.apache.beam.runners.direct.portable.ImmutableListBundleFactory;
import org.apache.beam.runners.direct.portable.PortableGraph;
import org.apache.beam.runners.direct.portable.RootProviderRegistry;
import org.apache.beam.runners.direct.portable.TransformEvaluatorRegistry;
import org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactRetrievalService;
import org.apache.beam.runners.direct.portable.artifact.UnsupportedArtifactRetrievalService;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class ReferenceRunner {
    private final RunnerApi.Pipeline pipeline;
    private final Struct options;
    @Nullable
    private final File artifactsDir;
    private final EnvironmentType environmentType;

    private ReferenceRunner(RunnerApi.Pipeline p, Struct options, @Nullable File artifactsDir, EnvironmentType environmentType) {
        this.pipeline = this.executable(p);
        this.options = options;
        this.artifactsDir = artifactsDir;
        this.environmentType = environmentType;
    }

    public static ReferenceRunner forPipeline(RunnerApi.Pipeline p, Struct options, File artifactsDir) {
        return new ReferenceRunner(p, options, artifactsDir, EnvironmentType.DOCKER);
    }

    static ReferenceRunner forInProcessPipeline(RunnerApi.Pipeline p, Struct options) {
        return new ReferenceRunner(p, options, null, EnvironmentType.IN_PROCESS);
    }

    private RunnerApi.Pipeline executable(RunnerApi.Pipeline original) {
        RunnerApi.Pipeline withGbks = ProtoOverrides.updateTransform(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, original, new PortableGroupByKeyReplacer());
        return GreedyPipelineFuser.fuse(withGbks).toPipeline();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute() throws Exception {
        PortableGraph graph = PortableGraph.forPipeline(this.pipeline);
        ImmutableListBundleFactory bundleFactory = ImmutableListBundleFactory.create();
        EvaluationContext ctxt = EvaluationContext.create(Instant::new, bundleFactory, graph, Collections.emptySet());
        RootProviderRegistry rootRegistry = RootProviderRegistry.impulseRegistry(bundleFactory);
        int targetParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 3);
        ServerFactory serverFactory = this.createServerFactory();
        MapControlClientPool controlClientPool = MapControlClientPool.create();
        ExecutorService dataExecutor = Executors.newCachedThreadPool();
        ProvisionApi.ProvisionInfo provisionInfo = ProvisionApi.ProvisionInfo.newBuilder().setJobId("id").setJobName("reference").setPipelineOptions(this.options).setWorkerId("foo").setResourceLimits(ProvisionApi.Resources.getDefaultInstance()).build();
        try (GrpcFnServer<GrpcLoggingService> logging = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
             GrpcFnServer<ArtifactRetrievalService> artifact = this.artifactsDir == null ? GrpcFnServer.allocatePortAndCreateFor(UnsupportedArtifactRetrievalService.create(), serverFactory) : GrpcFnServer.allocatePortAndCreateFor(LocalFileSystemArtifactRetrievalService.forRootDirectory(this.artifactsDir), serverFactory);
             GrpcFnServer<StaticGrpcProvisionService> provisioning = GrpcFnServer.allocatePortAndCreateFor(StaticGrpcProvisionService.create(provisionInfo), serverFactory);
             GrpcFnServer<FnApiControlClientPoolService> control = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(controlClientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory);
             GrpcFnServer<GrpcDataService> data = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(dataExecutor), serverFactory);
             GrpcFnServer<GrpcStateService> state = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);){
            EnvironmentFactory environmentFactory = this.createEnvironmentFactory(control, logging, artifact, provisioning, controlClientPool.getSource());
            JobBundleFactory jobBundleFactory = SingleEnvironmentInstanceJobBundleFactory.create(environmentFactory, data, state);
            TransformEvaluatorRegistry transformRegistry = TransformEvaluatorRegistry.portableRegistry(graph, this.pipeline.getComponents(), bundleFactory, jobBundleFactory, EvaluationContextStepStateAndTimersProvider.forContext(ctxt));
            ExecutorServiceParallelExecutor executor = ExecutorServiceParallelExecutor.create(targetParallelism, rootRegistry, transformRegistry, graph, ctxt);
            executor.start();
            executor.waitUntilFinish(Duration.ZERO);
        }
        finally {
            dataExecutor.shutdown();
        }
    }

    private ServerFactory createServerFactory() {
        switch (this.environmentType) {
            case DOCKER: {
                return ServerFactory.createDefault();
            }
            case IN_PROCESS: {
                return InProcessServerFactory.create();
            }
        }
        throw new IllegalArgumentException(String.format("Unknown %s %s", new Object[]{EnvironmentType.class.getSimpleName(), this.environmentType}));
    }

    private EnvironmentFactory createEnvironmentFactory(GrpcFnServer<FnApiControlClientPoolService> control, GrpcFnServer<GrpcLoggingService> logging, GrpcFnServer<ArtifactRetrievalService> artifact, GrpcFnServer<StaticGrpcProvisionService> provisioning, ControlClientPool.Source controlClientSource) {
        switch (this.environmentType) {
            case DOCKER: {
                return DockerEnvironmentFactory.forServices(control, logging, artifact, provisioning, controlClientSource, IdGenerators.incrementingLongs());
            }
            case IN_PROCESS: {
                return InProcessEnvironmentFactory.create(PipelineOptionsFactory.create(), logging, control, controlClientSource);
            }
        }
        throw new IllegalArgumentException(String.format("Unknown %s %s", new Object[]{EnvironmentType.class.getSimpleName(), this.environmentType}));
    }

    private static enum EnvironmentType {
        DOCKER,
        IN_PROCESS;

    }

    @VisibleForTesting
    static class PortableGroupByKeyReplacer
    implements ProtoOverrides.TransformReplacement {
        PortableGroupByKeyReplacer() {
        }

        @Override
        public RunnerApi.MessageWithComponents getReplacement(String gbkId, RunnerApi.ComponentsOrBuilder components) {
            RunnerApi.PTransform gbk = components.getTransformsOrThrow(gbkId);
            Preconditions.checkArgument(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN.equals(gbk.getSpec().getUrn()), "URN must be %s, got %s", (Object)PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, (Object)gbk.getSpec().getUrn());
            String inputId = Iterables.getOnlyElement(gbk.getInputsMap().values());
            RunnerApi.PCollection input = components.getPcollectionsOrThrow(inputId);
            RunnerApi.Coder inputCoder = components.getCodersOrThrow(input.getCoderId());
            ModelCoders.KvCoderComponents kvComponents = ModelCoders.getKvCoderComponents(inputCoder);
            String windowCoderId = components.getWindowingStrategiesOrThrow(input.getWindowingStrategyId()).getWindowCoderId();
            RunnerApi.Coder intermediateCoder = RunnerApi.Coder.newBuilder().setSpec(RunnerApi.SdkFunctionSpec.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:direct:keyedworkitem:v1"))).addAllComponentCoderIds(ImmutableList.of(kvComponents.keyCoderId(), kvComponents.valueCoderId(), windowCoderId)).build();
            String intermediateCoderId = SyntheticComponents.uniqueId(String.format("keyed_work_item(%s:%s)", kvComponents.keyCoderId(), kvComponents.valueCoderId()), components::containsCoders);
            String partitionedId = SyntheticComponents.uniqueId(String.format("%s.%s", inputId, "partitioned"), components::containsPcollections);
            RunnerApi.PCollection partitioned = input.toBuilder().setUniqueName(partitionedId).setCoderId(intermediateCoderId).build();
            String gbkoId = SyntheticComponents.uniqueId(String.format("%s/GBKO", gbkId), components::containsTransforms);
            RunnerApi.PTransform gbko = RunnerApi.PTransform.newBuilder().putAllInputs(gbk.getInputsMap()).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:beam:directrunner:transforms:gbko:v1")).putOutputs("output", partitionedId).build();
            String gabwId = SyntheticComponents.uniqueId(String.format("%s/GABW", gbkId), components::containsTransforms);
            RunnerApi.PTransform gabw = RunnerApi.PTransform.newBuilder().putInputs("input", partitionedId).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:beam:directrunner:transforms:gabw:v1")).putAllOutputs(gbk.getOutputsMap()).build();
            RunnerApi.Components newComponents = RunnerApi.Components.newBuilder().putCoders(intermediateCoderId, intermediateCoder).putPcollections(partitionedId, partitioned).putTransforms(gbkoId, gbko).putTransforms(gabwId, gabw).build();
            return RunnerApi.MessageWithComponents.newBuilder().setPtransform(gbk.toBuilder().addSubtransforms(gbkoId).addSubtransforms(gabwId).build()).setComponents(newComponents).build();
        }
    }
}

