/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.core.construction.expansion;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.direct_java.runners.core.construction.BeamUrns;
import org.apache.beam.repackaged.direct_java.runners.core.construction.CoderTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.Environments;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PipelineTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.RehydratedComponents;
import org.apache.beam.repackaged.direct_java.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ProtocolStringList;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Converter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExpansionService
extends ExpansionServiceGrpc.ExpansionServiceImplBase
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ExpansionService.class);
    private Map<String, TransformProvider> registeredTransforms = this.loadRegisteredTransforms();

    private Map<String, TransformProvider> loadRegisteredTransforms() {
        ImmutableMap.Builder registeredTransforms = ImmutableMap.builder();
        for (ExpansionServiceRegistrar registrar : ServiceLoader.load(ExpansionServiceRegistrar.class)) {
            registeredTransforms.putAll(registrar.knownTransforms());
        }
        return registeredTransforms.build();
    }

    @VisibleForTesting
    ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest request) {
        LOG.info("Expanding '{}' with URN '{}'", (Object)request.getTransform().getUniqueName(), (Object)request.getTransform().getSpec().getUrn());
        LOG.debug("Full transform: {}", (Object)request.getTransform());
        Set existingTransformIds = request.getComponents().getTransformsMap().keySet();
        Pipeline pipeline = Pipeline.create();
        ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)pipeline.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
        RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(request.getComponents()).withPipeline(pipeline);
        Map<String, PCollection<?>> inputs = request.getTransform().getInputsMap().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, input -> {
            try {
                return rehydratedComponents.getPCollection((String)input.getValue());
            }
            catch (IOException exn) {
                throw new RuntimeException(exn);
            }
        }));
        if (!this.registeredTransforms.containsKey(request.getTransform().getSpec().getUrn())) {
            throw new UnsupportedOperationException("Unknown urn: " + request.getTransform().getSpec().getUrn());
        }
        Map<String, PCollection<?>> outputs = this.registeredTransforms.get(request.getTransform().getSpec().getUrn()).apply(pipeline, request.getTransform().getUniqueName(), request.getTransform().getSpec(), inputs);
        SdkComponents sdkComponents = rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace());
        sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
        Map<String, String> outputMap = outputs.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, output -> {
            try {
                return sdkComponents.registerPCollection((PCollection)output.getValue());
            }
            catch (IOException exn) {
                throw new RuntimeException(exn);
            }
        }));
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents);
        String expandedTransformId = (String)Iterables.getOnlyElement((Iterable)pipelineProto.getRootTransformIdsList().stream().filter(id -> !existingTransformIds.contains(id)).collect(Collectors.toList()));
        RunnerApi.Components components = pipelineProto.getComponents();
        RunnerApi.PTransform expandedTransform = components.getTransformsOrThrow(expandedTransformId).toBuilder().setUniqueName(expandedTransformId).clearOutputs().putAllOutputs(outputMap).build();
        LOG.debug("Expanded to {}", (Object)expandedTransform);
        return ExpansionApi.ExpansionResponse.newBuilder().setComponents(components.toBuilder().removeTransforms(expandedTransformId)).setTransform(expandedTransform).build();
    }

    public void expand(ExpansionApi.ExpansionRequest request, StreamObserver<ExpansionApi.ExpansionResponse> responseObserver) {
        try {
            responseObserver.onNext((Object)this.expand(request));
            responseObserver.onCompleted();
        }
        catch (RuntimeException exn) {
            responseObserver.onError((Throwable)exn);
            throw exn;
        }
    }

    @Override
    public void close() throws Exception {
    }

    public static void main(String[] args) throws Exception {
        int port = Integer.parseInt(args[0]);
        System.out.println("Starting expansion service at localhost:" + port);
        Server server = ServerBuilder.forPort((int)port).addService((BindableService)new ExpansionService()).build();
        server.start();
        server.awaitTermination();
    }

    public static interface TransformProvider<InputT extends PInput, OutputT extends POutput> {
        default public InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) {
            if (inputs.size() == 0) {
                return (InputT)p.begin();
            }
            if (inputs.size() == 1) {
                return (InputT)((PInput)Iterables.getOnlyElement(inputs.values()));
            }
            PCollectionTuple inputTuple = PCollectionTuple.empty((Pipeline)p);
            for (Map.Entry<String, PCollection<?>> entry : inputs.entrySet()) {
                inputTuple = inputTuple.and(new TupleTag(entry.getKey()), entry.getValue());
            }
            return (InputT)inputTuple;
        }

        public PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec var1);

        default public Map<String, PCollection<?>> extractOutputs(OutputT output) {
            if (output instanceof PDone) {
                return Collections.emptyMap();
            }
            if (output instanceof PCollection) {
                return ImmutableMap.of((Object)"output", (Object)((PCollection)output));
            }
            if (output instanceof PCollectionTuple) {
                return ((PCollectionTuple)output).getAll().entrySet().stream().collect(Collectors.toMap(entry -> ((TupleTag)entry.getKey()).getId(), Map.Entry::getValue));
            }
            if (output instanceof PCollectionList) {
                PCollectionList listOutput = (PCollectionList)output;
                return IntStream.range(0, listOutput.size()).boxed().collect(Collectors.toMap(Object::toString, arg_0 -> ((PCollectionList)listOutput).get(arg_0)));
            }
            throw new UnsupportedOperationException("Unknown output type: " + output.getClass());
        }

        default public Map<String, PCollection<?>> apply(Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String, PCollection<?>> inputs) {
            return this.extractOutputs(Pipeline.applyTransform((String)name, this.createInput(p, inputs), this.getTransform(spec)));
        }
    }

    public static class ExternalTransformRegistrarLoader
    implements ExpansionServiceRegistrar {
        @Override
        public Map<String, TransformProvider> knownTransforms() {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (ExternalTransformRegistrar registrar : ServiceLoader.load(ExternalTransformRegistrar.class)) {
                for (Map.Entry entry : registrar.knownBuilders().entrySet()) {
                    String urn = (String)entry.getKey();
                    Class builderClass = (Class)entry.getValue();
                    builder.put((Object)urn, spec -> {
                        try {
                            ExternalTransforms.ExternalConfigurationPayload payload = ExternalTransforms.ExternalConfigurationPayload.parseFrom((ByteString)spec.getPayload());
                            return ExternalTransformRegistrarLoader.translate(payload, builderClass);
                        }
                        catch (Exception e) {
                            throw new RuntimeException(String.format("Failed to build transform %s from spec %s", urn, spec), e);
                        }
                    });
                }
            }
            return builder.build();
        }

        private static PTransform translate(ExternalTransforms.ExternalConfigurationPayload payload, Class<? extends ExternalTransformBuilder> builderClass) throws Exception {
            Preconditions.checkState((boolean)ExternalTransformBuilder.class.isAssignableFrom(builderClass), (String)"Provided identifier %s is not an ExternalTransformBuilder.", (Object)builderClass.getName());
            Object configObject = ExternalTransformRegistrarLoader.initConfiguration(builderClass);
            ExternalTransformRegistrarLoader.populateConfiguration(configObject, payload);
            return ExternalTransformRegistrarLoader.buildTransform(builderClass, configObject);
        }

        private static Object initConfiguration(Class<? extends ExternalTransformBuilder> builderClass) throws Exception {
            for (Method method : builderClass.getMethods()) {
                if (!method.getName().equals("buildExternal")) continue;
                Preconditions.checkState((method.getParameterCount() == 1 ? 1 : 0) != 0, (String)"Build method for ExternalTransformBuilder %s must have exactly one parameter, but had %s parameters.", (Object)builderClass.getSimpleName(), (int)method.getParameterCount());
                Class<?> configurationClass = method.getParameterTypes()[0];
                if (Object.class.equals(configurationClass)) continue;
                return configurationClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            }
            throw new RuntimeException("Couldn't find build method on ExternalTransformBuilder.");
        }

        @VisibleForTesting
        static void populateConfiguration(Object config, ExternalTransforms.ExternalConfigurationPayload payload) throws Exception {
            Converter camelCaseConverter = CaseFormat.LOWER_UNDERSCORE.converterTo(CaseFormat.LOWER_CAMEL);
            for (Map.Entry entry : payload.getConfigurationMap().entrySet()) {
                Method method;
                String key = (String)entry.getKey();
                ExternalTransforms.ConfigValue value = (ExternalTransforms.ConfigValue)entry.getValue();
                String fieldName = (String)camelCaseConverter.convert((Object)key);
                ProtocolStringList coderUrns = value.getCoderUrnList();
                Preconditions.checkArgument((coderUrns.size() > 0 ? 1 : 0) != 0, (Object)"No Coder URN provided.");
                Coder coder = ExternalTransformRegistrarLoader.resolveCoder((List<String>)coderUrns);
                Class type = coder.getEncodedTypeDescriptor().getRawType();
                String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
                try {
                    method = config.getClass().getMethod(setterName, type);
                }
                catch (NoSuchMethodException e) {
                    throw new RuntimeException(String.format("The configuration class %s is missing a setter %s for %s with type %s", config.getClass(), setterName, fieldName, coder.getEncodedTypeDescriptor().getType().getTypeName()), e);
                }
                method.invoke(config, coder.decode(((ExternalTransforms.ConfigValue)entry.getValue()).getPayload().newInput(), Coder.Context.NESTED));
            }
        }

        private static Coder resolveCoder(List<String> coderUrns) throws Exception {
            Preconditions.checkArgument((coderUrns.size() > 0 ? 1 : 0) != 0, (Object)"No Coder URN provided.");
            RunnerApi.Components.Builder componentsBuilder = RunnerApi.Components.newBuilder();
            ArrayDeque<String> coderQueue = new ArrayDeque<String>(coderUrns);
            RunnerApi.Coder coder = ExternalTransformRegistrarLoader.buildProto(coderQueue, componentsBuilder);
            RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(componentsBuilder.build());
            return CoderTranslation.fromProto(coder, rehydratedComponents);
        }

        private static RunnerApi.Coder buildProto(Deque<String> coderUrns, RunnerApi.Components.Builder componentsBuilder) {
            Preconditions.checkArgument((coderUrns.size() > 0 ? 1 : 0) != 0, (Object)"No URNs left to construct coder from");
            String coderUrn = coderUrns.pop();
            RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(coderUrn).build());
            if (coderUrn.equals(BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardCoders.Enum.ITERABLE))) {
                RunnerApi.Coder elementCoder = ExternalTransformRegistrarLoader.buildProto(coderUrns, componentsBuilder);
                String coderId = UUID.randomUUID().toString();
                componentsBuilder.putCoders(coderId, elementCoder);
                coderBuilder.addComponentCoderIds(coderId);
            } else if (coderUrn.equals(BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardCoders.Enum.KV))) {
                RunnerApi.Coder element1Coder = ExternalTransformRegistrarLoader.buildProto(coderUrns, componentsBuilder);
                RunnerApi.Coder element2Coder = ExternalTransformRegistrarLoader.buildProto(coderUrns, componentsBuilder);
                String coderId1 = UUID.randomUUID().toString();
                String coderId2 = UUID.randomUUID().toString();
                componentsBuilder.putCoders(coderId1, element1Coder);
                componentsBuilder.putCoders(coderId2, element2Coder);
                coderBuilder.addComponentCoderIds(coderId1);
                coderBuilder.addComponentCoderIds(coderId2);
            }
            return coderBuilder.build();
        }

        private static PTransform buildTransform(Class<? extends ExternalTransformBuilder> builderClass, Object configObject) throws Exception {
            Constructor<? extends ExternalTransformBuilder> constructor = builderClass.getDeclaredConstructor(new Class[0]);
            constructor.setAccessible(true);
            ExternalTransformBuilder externalTransformBuilder = constructor.newInstance(new Object[0]);
            Method buildMethod = builderClass.getMethod("buildExternal", configObject.getClass());
            buildMethod.setAccessible(true);
            return (PTransform)buildMethod.invoke((Object)externalTransformBuilder, configObject);
        }
    }

    public static interface ExpansionServiceRegistrar {
        public Map<String, TransformProvider> knownTransforms();
    }
}

