/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.Environments;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PipelineTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.RehydratedComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExpansionService
extends ExpansionServiceGrpc.ExpansionServiceImplBase {
    private static final Logger LOG = LoggerFactory.getLogger(ExpansionService.class);
    private Map<String, TransformProvider> registeredTransforms = this.loadRegisteredTransforms();

    private Map<String, TransformProvider> loadRegisteredTransforms() {
        ImmutableMap.Builder registeredTransforms = ImmutableMap.builder();
        for (ExpansionServiceRegistrar registrar : ServiceLoader.load(ExpansionServiceRegistrar.class)) {
            registeredTransforms.putAll(registrar.knownTransforms());
        }
        return registeredTransforms.build();
    }

    @VisibleForTesting
    ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest request) {
        LOG.info("Expanding '{}' with URN '{}'", (Object)request.getTransform().getUniqueName(), (Object)request.getTransform().getSpec().getUrn());
        LOG.debug("Full transform: {}", (Object)request.getTransform());
        Set existingTransformIds = request.getComponents().getTransformsMap().keySet();
        Pipeline pipeline = Pipeline.create();
        RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(request.getComponents()).withPipeline(pipeline);
        Map<String, PCollection<?>> inputs = request.getTransform().getInputsMap().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, input -> {
            try {
                return rehydratedComponents.getPCollection((String)input.getValue());
            }
            catch (IOException exn) {
                throw new RuntimeException(exn);
            }
        }));
        if (!this.registeredTransforms.containsKey(request.getTransform().getSpec().getUrn())) {
            throw new UnsupportedOperationException("Unknown urn: " + request.getTransform().getSpec().getUrn());
        }
        this.registeredTransforms.get(request.getTransform().getSpec().getUrn()).apply(pipeline, request.getTransform().getUniqueName(), request.getTransform().getSpec(), inputs);
        SdkComponents sdkComponents = rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace());
        sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents);
        String expandedTransformId = (String)Iterables.getOnlyElement((Iterable)pipelineProto.getRootTransformIdsList().stream().filter(id -> !existingTransformIds.contains(id)).collect(Collectors.toList()));
        RunnerApi.Components components = pipelineProto.getComponents();
        LOG.debug("Expanded to {}", (Object)components.getTransformsOrThrow(expandedTransformId));
        return ExpansionApi.ExpansionResponse.newBuilder().setComponents(components.toBuilder().removeTransforms(expandedTransformId)).setTransform(components.getTransformsOrThrow(expandedTransformId)).build();
    }

    public void expand(ExpansionApi.ExpansionRequest request, StreamObserver<ExpansionApi.ExpansionResponse> responseObserver) {
        try {
            responseObserver.onNext((Object)this.expand(request));
            responseObserver.onCompleted();
        }
        catch (RuntimeException exn) {
            responseObserver.onError((Throwable)exn);
            throw exn;
        }
    }

    public static void main(String[] args) throws Exception {
        int port = Integer.parseInt(args[0]);
        System.out.println("Starting expansion service at localhost:" + port);
        Server server = ServerBuilder.forPort((int)port).addService((BindableService)new ExpansionService()).build();
        server.start();
        server.awaitTermination();
    }

    public static interface TransformProvider<InputT extends PValue, OutputT extends PValue> {
        default public InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) {
            if (inputs.size() == 0) {
                return (InputT)((PValue)p.begin());
            }
            if (inputs.size() == 1) {
                return (InputT)((PValue)Iterables.getOnlyElement(inputs.values()));
            }
            PCollectionTuple inputTuple = PCollectionTuple.empty((Pipeline)p);
            for (Map.Entry<String, PCollection<?>> entry : inputs.entrySet()) {
                inputTuple = inputTuple.and(new TupleTag(entry.getKey()), entry.getValue());
            }
            return (InputT)((PValue)inputTuple);
        }

        public PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec var1);

        default public Map<String, PCollection<?>> extractOutputs(OutputT output) {
            if (output instanceof PDone) {
                return Collections.emptyMap();
            }
            if (output instanceof PCollection) {
                return ImmutableMap.of((Object)"output", (Object)((PCollection)output));
            }
            if (output instanceof PCollectionTuple) {
                return ((PCollectionTuple)output).getAll().entrySet().stream().collect(Collectors.toMap(entry -> ((TupleTag)entry.getKey()).toString(), Map.Entry::getValue));
            }
            if (output instanceof PCollectionList) {
                PCollectionList listOutput = (PCollectionList)output;
                return IntStream.range(0, listOutput.size()).boxed().collect(Collectors.toMap(index -> "output_" + index, arg_0 -> ((PCollectionList)listOutput).get(arg_0)));
            }
            throw new UnsupportedOperationException("Unknown output type: " + output.getClass());
        }

        default public Map<String, PCollection<?>> apply(Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String, PCollection<?>> inputs) {
            return this.extractOutputs((PValue)Pipeline.applyTransform((String)name, this.createInput(p, inputs), this.getTransform(spec)));
        }
    }

    public static interface ExpansionServiceRegistrar {
        public Map<String, TransformProvider> knownTransforms();
    }
}

