/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.expansion.service;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.expansion.service.ExpansionServiceOptions;
import org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Converter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExpansionService
extends ExpansionServiceGrpc.ExpansionServiceImplBase
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ExpansionService.class);
    private @MonotonicNonNull Map<String, TransformProvider> registeredTransforms;
    private final PipelineOptions pipelineOptions;

    public ExpansionService() {
        this(new String[0]);
    }

    public ExpansionService(String[] args) {
        this(PipelineOptionsFactory.fromArgs((String[])args).create());
    }

    public ExpansionService(PipelineOptions opts) {
        this.pipelineOptions = opts;
    }

    private Map<String, TransformProvider> getRegisteredTransforms() {
        if (this.registeredTransforms == null) {
            this.registeredTransforms = this.loadRegisteredTransforms();
        }
        return this.registeredTransforms;
    }

    private Map<String, TransformProvider> loadRegisteredTransforms() {
        ImmutableMap.Builder registeredTransformsBuilder = ImmutableMap.builder();
        for (ExpansionServiceRegistrar registrar : ServiceLoader.load(ExpansionServiceRegistrar.class)) {
            registeredTransformsBuilder.putAll(registrar.knownTransforms());
        }
        ImmutableMap registeredTransforms = registeredTransformsBuilder.build();
        LOG.info("Registering external transforms: {}", (Object)registeredTransforms.keySet());
        return registeredTransforms;
    }

    @VisibleForTesting
    ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest request) {
        boolean isUseDeprecatedRead;
        LOG.info("Expanding '{}' with URN '{}'", (Object)request.getTransform().getUniqueName(), (Object)request.getTransform().getSpec().getUrn());
        LOG.debug("Full transform: {}", (Object)request.getTransform());
        Set existingTransformIds = request.getComponents().getTransformsMap().keySet();
        Pipeline pipeline = this.createPipeline();
        boolean bl = isUseDeprecatedRead = ExperimentalOptions.hasExperiment((PipelineOptions)this.pipelineOptions, (String)"use_deprecated_read") || ExperimentalOptions.hasExperiment((PipelineOptions)this.pipelineOptions, (String)"beam_fn_api_use_deprecated_read");
        if (!isUseDeprecatedRead) {
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)pipeline.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)pipeline.getOptions().as(ExperimentalOptions.class)), (String)"use_sdf_read");
        } else {
            LOG.warn("Using use_depreacted_read in portable runners is runner-dependent. The ExpansionService will respect that, but if your runner does not have support for native Read transform, your Pipeline will fail during Pipeline submission.");
        }
        RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents((RunnerApi.Components)request.getComponents()).withPipeline(pipeline);
        Map<String, PCollection<?>> inputs = request.getTransform().getInputsMap().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, input -> {
            try {
                return rehydratedComponents.getPCollection((String)input.getValue());
            }
            catch (IOException exn) {
                throw new RuntimeException(exn);
            }
        }));
        String urn = request.getTransform().getSpec().getUrn();
        JavaClassLookupTransformProvider transformProvider = null;
        if (BeamUrns.getUrn((ProtocolMessageEnum)ExternalTransforms.ExpansionMethods.Enum.JAVA_CLASS_LOOKUP).equals(urn)) {
            JavaClassLookupTransformProvider.AllowList allowList = ((ExpansionServiceOptions)this.pipelineOptions.as(ExpansionServiceOptions.class)).getJavaClassLookupAllowlist();
            assert (allowList != null);
            transformProvider = new JavaClassLookupTransformProvider(allowList);
        } else {
            transformProvider = this.getRegisteredTransforms().get(urn);
            if (transformProvider == null) {
                throw new UnsupportedOperationException("Unknown urn: " + request.getTransform().getSpec().getUrn());
            }
        }
        List<String> classpathResources = transformProvider.getDependencies(request.getTransform().getSpec(), pipeline.getOptions());
        ((PortablePipelineOptions)pipeline.getOptions().as(PortablePipelineOptions.class)).setFilesToStage(classpathResources);
        Map<String, PCollection<?>> outputs = transformProvider.apply(pipeline, request.getTransform().getUniqueName(), request.getTransform().getSpec(), inputs);
        SdkComponents sdkComponents = rehydratedComponents.getSdkComponents(Collections.emptyList()).withNewIdPrefix(request.getNamespace());
        sdkComponents.registerEnvironment(Environments.createOrGetDefaultEnvironment((PortablePipelineOptions)((PortablePipelineOptions)pipeline.getOptions().as(PortablePipelineOptions.class))));
        Map<String, String> outputMap = outputs.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, output -> {
            try {
                return sdkComponents.registerPCollection((PCollection)output.getValue());
            }
            catch (IOException exn) {
                throw new RuntimeException(exn);
            }
        }));
        if (isUseDeprecatedRead) {
            SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary((Pipeline)pipeline);
        }
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)pipeline, (SdkComponents)sdkComponents);
        String expandedTransformId = (String)Iterables.getOnlyElement((Iterable)pipelineProto.getRootTransformIdsList().stream().filter(id -> !existingTransformIds.contains(id)).collect(Collectors.toList()));
        RunnerApi.Components components = pipelineProto.getComponents();
        RunnerApi.PTransform expandedTransform = components.getTransformsOrThrow(expandedTransformId).toBuilder().setUniqueName(expandedTransformId).clearOutputs().putAllOutputs(outputMap).build();
        LOG.debug("Expanded to {}", (Object)expandedTransform);
        return ExpansionApi.ExpansionResponse.newBuilder().setComponents(components.toBuilder().removeTransforms(expandedTransformId)).setTransform(expandedTransform).addAllRequirements((Iterable)pipelineProto.getRequirementsList()).build();
    }

    protected Pipeline createPipeline() {
        PipelineOptions effectiveOpts = PipelineOptionsFactory.create();
        PortablePipelineOptions portableOptions = (PortablePipelineOptions)effectiveOpts.as(PortablePipelineOptions.class);
        PortablePipelineOptions specifiedOptions = (PortablePipelineOptions)this.pipelineOptions.as(PortablePipelineOptions.class);
        Optional.ofNullable(specifiedOptions.getDefaultEnvironmentType()).ifPresent(arg_0 -> ((PortablePipelineOptions)portableOptions).setDefaultEnvironmentType(arg_0));
        Optional.ofNullable(specifiedOptions.getDefaultEnvironmentConfig()).ifPresent(arg_0 -> ((PortablePipelineOptions)portableOptions).setDefaultEnvironmentConfig(arg_0));
        ((ExperimentalOptions)effectiveOpts.as(ExperimentalOptions.class)).setExperiments(((ExperimentalOptions)this.pipelineOptions.as(ExperimentalOptions.class)).getExperiments());
        effectiveOpts.setRunner(NotRunnableRunner.class);
        return Pipeline.create((PipelineOptions)effectiveOpts);
    }

    public void expand(ExpansionApi.ExpansionRequest request, StreamObserver<ExpansionApi.ExpansionResponse> responseObserver) {
        try {
            responseObserver.onNext((Object)this.expand(request));
            responseObserver.onCompleted();
        }
        catch (RuntimeException exn) {
            responseObserver.onNext((Object)ExpansionApi.ExpansionResponse.newBuilder().setError(Throwables.getStackTraceAsString((Throwable)exn)).build());
            responseObserver.onCompleted();
        }
    }

    @Override
    public void close() throws Exception {
    }

    public static void main(String[] args) throws Exception {
        int port = Integer.parseInt(args[0]);
        System.out.println("Starting expansion service at localhost:" + port);
        PipelineOptionsFactory.register(ExpansionServiceOptions.class);
        ExpansionService service = new ExpansionService(Arrays.copyOfRange(args, 1, args.length));
        for (Map.Entry<String, TransformProvider> entry : service.getRegisteredTransforms().entrySet()) {
            System.out.println("\t" + entry.getKey() + ": " + entry.getValue());
        }
        Server server = ServerBuilder.forPort((int)port).addService((BindableService)service).addService((BindableService)new ArtifactRetrievalService()).build();
        server.start();
        server.awaitTermination();
    }

    private static class NotRunnableRunner
    extends PipelineRunner<PipelineResult> {
        private NotRunnableRunner() {
        }

        public static NotRunnableRunner fromOptions(PipelineOptions opts) {
            return new NotRunnableRunner();
        }

        public PipelineResult run(Pipeline pipeline) {
            throw new UnsupportedOperationException();
        }
    }

    public static interface TransformProvider<InputT extends PInput, OutputT extends POutput> {
        default public InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) {
            if ((inputs = (Map)org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(inputs)).size() == 0) {
                return (InputT)p.begin();
            }
            if (inputs.size() == 1) {
                return (InputT)((PInput)Iterables.getOnlyElement(inputs.values()));
            }
            PCollectionTuple inputTuple = PCollectionTuple.empty((Pipeline)p);
            for (Map.Entry entry : inputs.entrySet()) {
                inputTuple = inputTuple.and(new TupleTag((String)entry.getKey()), (PCollection)entry.getValue());
            }
            return (InputT)inputTuple;
        }

        public PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec var1);

        default public Map<String, PCollection<?>> extractOutputs(OutputT output) {
            if (output instanceof PDone) {
                return Collections.emptyMap();
            }
            if (output instanceof PCollection) {
                return ImmutableMap.of((Object)"output", (Object)((PCollection)output));
            }
            if (output instanceof PCollectionTuple) {
                return ((PCollectionTuple)output).getAll().entrySet().stream().collect(Collectors.toMap(entry -> ((TupleTag)entry.getKey()).getId(), Map.Entry::getValue));
            }
            if (output instanceof PCollectionList) {
                PCollectionList listOutput = (PCollectionList)output;
                ImmutableMap.Builder indexToPCollection = ImmutableMap.builder();
                int i = 0;
                for (PCollection pc : listOutput.getAll()) {
                    indexToPCollection.put((Object)Integer.toString(i), (Object)pc);
                    ++i;
                }
                return indexToPCollection.build();
            }
            throw new UnsupportedOperationException("Unknown output type: " + output.getClass());
        }

        default public Map<String, PCollection<?>> apply(Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String, PCollection<?>> inputs) {
            return this.extractOutputs(Pipeline.applyTransform((String)name, this.createInput(p, inputs), this.getTransform(spec)));
        }

        default public List<String> getDependencies(RunnerApi.FunctionSpec spec, PipelineOptions options) {
            List filesToStage = ((PortablePipelineOptions)options.as(PortablePipelineOptions.class)).getFilesToStage();
            if (filesToStage == null || filesToStage.isEmpty()) {
                ClassLoader classLoader = Environments.class.getClassLoader();
                if (classLoader == null) {
                    throw new RuntimeException("Cannot detect classpath: classloader is null (is it the bootstrap classloader?)");
                }
                filesToStage = PipelineResources.detectClassPathResourcesToStage((ClassLoader)classLoader, (PipelineOptions)options);
                if (filesToStage.isEmpty()) {
                    throw new IllegalArgumentException("No classpath elements found.");
                }
            }
            LOG.debug("Staging to files from the classpath: {}", (Object)filesToStage.size());
            return filesToStage;
        }
    }

    @AutoService(value={ExpansionServiceRegistrar.class})
    public static class ExternalTransformRegistrarLoader
    implements ExpansionServiceRegistrar {
        private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();

        @Override
        public Map<String, TransformProvider> knownTransforms() {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (ExternalTransformRegistrar registrar : ServiceLoader.load(ExternalTransformRegistrar.class)) {
                for (Map.Entry entry : registrar.knownBuilderInstances().entrySet()) {
                    final String urn = (String)entry.getKey();
                    final ExternalTransformBuilder builderInstance = (ExternalTransformBuilder)entry.getValue();
                    TransformProvider transformProvider = new TransformProvider(){

                        public PTransform getTransform(RunnerApi.FunctionSpec spec) {
                            try {
                                Class configClass = ExternalTransformRegistrarLoader.getConfigClass(builderInstance);
                                return builderInstance.buildExternal(ExternalTransformRegistrarLoader.payloadToConfig(ExternalTransforms.ExternalConfigurationPayload.parseFrom((ByteString)spec.getPayload()), configClass));
                            }
                            catch (Exception e) {
                                throw new RuntimeException(String.format("Failed to build transform %s from spec %s", urn, spec), e);
                            }
                        }

                        @Override
                        public List<String> getDependencies(RunnerApi.FunctionSpec spec, PipelineOptions options) {
                            try {
                                Class configClass = ExternalTransformRegistrarLoader.getConfigClass(builderInstance);
                                Optional dependencies = builderInstance.getDependencies(ExternalTransformRegistrarLoader.payloadToConfig(ExternalTransforms.ExternalConfigurationPayload.parseFrom((ByteString)spec.getPayload()), configClass), options);
                                return dependencies.orElseGet(() -> TransformProvider.super.getDependencies(spec, options));
                            }
                            catch (Exception e) {
                                throw new RuntimeException(String.format("Failed to get dependencies of %s from spec %s", urn, spec), e);
                            }
                        }
                    };
                    builder.put((Object)urn, (Object)transformProvider);
                }
            }
            return builder.build();
        }

        private static <ConfigT> Class<ConfigT> getConfigClass(ExternalTransformBuilder<ConfigT, ?, ?> transformBuilder) {
            Class<?> configurationClass = null;
            for (Method method : transformBuilder.getClass().getMethods()) {
                if (!method.getName().equals("buildExternal")) continue;
                Preconditions.checkState((method.getParameterCount() == 1 ? 1 : 0) != 0, (String)"Build method for ExternalTransformBuilder %s must have exactly one parameter, but had %s parameters.", (Object)transformBuilder.getClass().getSimpleName(), (int)method.getParameterCount());
                configurationClass = method.getParameterTypes()[0];
                if (!Object.class.equals(configurationClass)) break;
            }
            if (configurationClass == null) {
                throw new AssertionError((Object)"Failed to find buildExternal method.");
            }
            return configurationClass;
        }

        static <ConfigT> Row decodeConfigObjectRow(SchemaApi.Schema schema, ByteString payload) {
            Row configRow;
            Schema payloadSchema = SchemaTranslation.schemaFromProto((SchemaApi.Schema)schema);
            if (payloadSchema.getFieldCount() == 0) {
                return Row.withSchema((Schema)Schema.of((Schema.Field[])new Schema.Field[0])).build();
            }
            Converter camelCaseConverter = CaseFormat.LOWER_UNDERSCORE.converterTo(CaseFormat.LOWER_CAMEL);
            payloadSchema = (Schema)payloadSchema.getFields().stream().map(field -> {
                Preconditions.checkNotNull((Object)field.getName());
                if (field.getName().contains("_")) {
                    @Nullable String newName = (String)camelCaseConverter.convert((Object)field.getName());
                    assert (newName != null) : "@AssumeAssertion(nullness): converter type is imprecise; it is nullness-preserving";
                    return field.withName(newName);
                }
                return field;
            }).collect(Schema.toSchema());
            try {
                configRow = (Row)RowCoder.of((Schema)payloadSchema).decode(payload.newInput());
            }
            catch (IOException e) {
                throw new RuntimeException("Error decoding payload", e);
            }
            return configRow;
        }

        @VisibleForTesting
        public static <ConfigT> ConfigT payloadToConfig(ExternalTransforms.ExternalConfigurationPayload payload, Class<ConfigT> configurationClass) {
            try {
                return ExternalTransformRegistrarLoader.payloadToConfigSchema(payload, configurationClass);
            }
            catch (NoSuchSchemaException schemaException) {
                LOG.warn("Configuration class '{}' has no schema registered. Attempting to construct with setter approach.", (Object)configurationClass.getName());
                try {
                    return ExternalTransformRegistrarLoader.payloadToConfigSetters(payload, configurationClass);
                }
                catch (ReflectiveOperationException e) {
                    throw new IllegalArgumentException(String.format("Failed to construct instance of configuration class '%s'", configurationClass.getName()), e);
                }
            }
        }

        private static <ConfigT> ConfigT payloadToConfigSchema(ExternalTransforms.ExternalConfigurationPayload payload, Class<ConfigT> configurationClass) throws NoSuchSchemaException {
            Schema configSchema = SCHEMA_REGISTRY.getSchema(configurationClass);
            SerializableFunction fromRowFunc = SCHEMA_REGISTRY.getFromRowFunction(configurationClass);
            Row payloadRow = ExternalTransformRegistrarLoader.decodeConfigObjectRow(payload.getSchema(), payload.getPayload());
            if (!payloadRow.getSchema().assignableTo(configSchema)) {
                throw new IllegalArgumentException(String.format("Schema in expansion request payload is not assignable to the schema for the configuration object.%n%nPayload Schema: %s%n%nConfiguration Schema: %s", payloadRow.getSchema(), configSchema));
            }
            return (ConfigT)fromRowFunc.apply((Object)payloadRow);
        }

        private static <ConfigT> ConfigT payloadToConfigSetters(ExternalTransforms.ExternalConfigurationPayload payload, Class<ConfigT> configurationClass) throws ReflectiveOperationException {
            Row configRow = ExternalTransformRegistrarLoader.decodeConfigObjectRow(payload.getSchema(), payload.getPayload());
            Constructor<ConfigT> constructor = configurationClass.getDeclaredConstructor(new Class[0]);
            constructor.setAccessible(true);
            ConfigT config = constructor.newInstance(new Object[0]);
            for (Schema.Field field : configRow.getSchema().getFields()) {
                Method method;
                String key = field.getName();
                @Nullable Object value = configRow.getValue(field.getName());
                String fieldName = key;
                Coder coder = SchemaCoder.coderForFieldType((Schema.FieldType)field.getType());
                Class type = coder.getEncodedTypeDescriptor().getRawType();
                String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
                try {
                    method = config.getClass().getMethod(setterName, type);
                }
                catch (NoSuchMethodException e) {
                    throw new IllegalArgumentException(String.format("The configuration class %s is missing a setter %s for %s with type %s", config.getClass(), setterName, fieldName, coder.getEncodedTypeDescriptor().getType().getTypeName()), e);
                }
                ExternalTransformRegistrarLoader.invokeSetter(config, value, method);
            }
            return config;
        }

        private static <ConfigT> void invokeSetter(ConfigT config, @Nullable Object value, Method method) throws IllegalAccessException, InvocationTargetException {
            method.invoke(config, value);
        }
    }

    public static interface ExpansionServiceRegistrar {
        public Map<String, TransformProvider> knownTransforms();
    }
}

