/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.feathr.offline.transformation;

import com.linkedin.feathr.common.AnchorExtractor;
import com.linkedin.feathr.common.FeatureTypeConfig;
import com.linkedin.feathr.common.FeatureTypes;
import com.linkedin.feathr.common.FeatureValue;
import com.linkedin.feathr.common.SparkRowExtractor;
import com.linkedin.feathr.common.exception.ErrorLabel;
import com.linkedin.feathr.common.exception.FeathrException;
import com.linkedin.feathr.common.tensor.TensorData;
import com.linkedin.feathr.common.types.FeatureType;
import com.linkedin.feathr.offline.FeatureDataFrame;
import com.linkedin.feathr.offline.anchored.anchorExtractor.SimpleConfigurableAnchorExtractor;
import com.linkedin.feathr.offline.job.FeatureTransformation$;
import com.linkedin.feathr.offline.job.FeatureTypeInferenceContext;
import com.linkedin.feathr.offline.job.TransformedResult;
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext;
import com.linkedin.feathr.offline.transformation.FeatureColumnFormat$;
import com.linkedin.feathr.offline.transformation.FeatureTypeAccumulator;
import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils$;
import java.io.Serializable;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import scala.Enumeration;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenSeq;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

public final class DataFrameBasedRowEvaluator$ {
    public static DataFrameBasedRowEvaluator$ MODULE$;

    static {
        new DataFrameBasedRowEvaluator$();
    }

    public TransformedResult transform(AnchorExtractor<?> transformer, Dataset<Row> inputDf, Seq<Tuple2<String, String>> requestedFeatureNameAndPrefix, Map<String, FeatureTypeConfig> featureTypeConfigs, Option<FeathrExpressionExecutionContext> mvelContext) {
        if (!(transformer instanceof SparkRowExtractor)) {
            throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, new StringBuilder(31).append(transformer).append(" must extend SparkRowExtractor.").toString());
        }
        AnchorExtractor<?> extractor = transformer;
        Seq<String> requestedFeatureRefString = (Seq<String>)requestedFeatureNameAndPrefix.map((Function1 & Serializable & scala.Serializable)x$1 -> (String)x$1._1(), Seq$.MODULE$.canBuildFrom());
        String featureNamePrefix = (String)((Tuple2)requestedFeatureNameAndPrefix.head())._2();
        Enumeration.Value featureFormat = FeatureColumnFormat$.MODULE$.FDS_TENSOR();
        Seq<String> selectedFeatureNames = requestedFeatureRefString.nonEmpty() ? requestedFeatureRefString : transformer.getProvidedFeatureNames();
        FeatureDataFrame featureDataFrame = this.transformToFDSTensor(extractor, inputDf, selectedFeatureNames, featureTypeConfigs, mvelContext);
        if (featureDataFrame == null) {
            throw new MatchError((Object)featureDataFrame);
        }
        Dataset<Row> transformedDF = featureDataFrame.df();
        Map<String, FeatureTypeConfig> transformedFeatureTypes = featureDataFrame.inferredFeatureType();
        Tuple2 tuple2 = new Tuple2(transformedDF, transformedFeatureTypes);
        Tuple2 tuple22 = tuple2;
        Dataset transformedDF2 = (Dataset)tuple22._1();
        Map transformedFeatureTypes2 = (Map)tuple22._2();
        return new TransformedResult((Seq<Tuple2<String, String>>)((Seq)selectedFeatureNames.map((Function1 & Serializable & scala.Serializable)x$3 -> new Tuple2(x$3, (Object)featureNamePrefix), Seq$.MODULE$.canBuildFrom())), (Dataset<Row>)transformedDF2, (Map<String, Enumeration.Value>)((TraversableOnce)selectedFeatureNames.map((Function1 & Serializable & scala.Serializable)c -> new Tuple2(c, (Object)featureFormat), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()), (Map<String, FeatureTypeConfig>)transformedFeatureTypes2);
    }

    private FeatureDataFrame transformToFDSTensor(SparkRowExtractor rowExtractor, Dataset<Row> inputDF, Seq<String> featureRefStrs, Map<String, FeatureTypeConfig> featureTypeConfigs, Option<FeathrExpressionExecutionContext> mvelContext) {
        Map<String, FeatureTypeAccumulator> featureTypeAccumulators;
        Map featureTypes;
        StructType inputSchema = inputDF.schema();
        SparkSession spark = SparkSession$.MODULE$.builder().getOrCreate();
        FeatureTypeInferenceContext featureTypeInferenceContext = FeatureTransformation$.MODULE$.getTypeInferenceContext(spark, (Map<String, FeatureTypes>)(featureTypes = featureTypeConfigs.mapValues((Function1 & Serializable & scala.Serializable)x$4 -> x$4.getFeatureType())), featureRefStrs);
        if (featureTypeInferenceContext == null) {
            throw new MatchError((Object)featureTypeInferenceContext);
        }
        Map<String, FeatureTypeAccumulator> map = featureTypeAccumulators = featureTypeInferenceContext.featureTypeAccumulators();
        Map<String, FeatureTypeAccumulator> featureTypeAccumulators2 = map;
        RDD transformedRdd = inputDF.rdd().map((Function1 & Serializable & scala.Serializable)row -> {
            GenericRowWithSchema rowWithSchema;
            GenericRowWithSchema genericRowWithSchema = rowWithSchema = row instanceof GenericRowWithSchema ? (GenericRowWithSchema)row : new GenericRowWithSchema((Object[])row.toSeq().toArray(ClassTag$.MODULE$.Any()), inputSchema);
            if (rowExtractor instanceof SimpleConfigurableAnchorExtractor) {
                ((SimpleConfigurableAnchorExtractor)rowExtractor).mvelContext_$eq(mvelContext);
            }
            Map<String, FeatureValue> result = rowExtractor.getFeaturesFromRow(rowWithSchema);
            Seq featureValues = (Seq)featureRefStrs.map((Function1 & Serializable & scala.Serializable)featureRef -> {
                Object object;
                if (result.contains(featureRef)) {
                    FeatureValue featureValue = (FeatureValue)result.apply(featureRef);
                    if (((FeatureTypeAccumulator)((Object)((Object)((Object)featureTypeAccumulators2.apply(featureRef))))).isZero() && featureValue != null) {
                        FeatureType.BasicType rowFeatureType = featureValue.getFeatureType().getBasicType();
                        ((FeatureTypeAccumulator)((Object)((Object)((Object)featureTypeAccumulators2.apply(featureRef))))).add(FeatureTypes.valueOf(rowFeatureType.toString()));
                    }
                    TensorData tensorData = featureValue.getAsTensorData();
                    object = FeaturizedDatasetUtils$.MODULE$.tensorToFDSDataFrameRow(tensorData, FeaturizedDatasetUtils$.MODULE$.tensorToFDSDataFrameRow$default$2());
                } else {
                    object = null;
                }
                return object;
            }, Seq$.MODULE$.canBuildFrom());
            return Row$.MODULE$.merge((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{row, Row$.MODULE$.fromSeq(featureValues)}));
        }, ClassTag$.MODULE$.apply(Row.class));
        Map<String, FeatureTypes> inferredFeatureTypes = FeatureTransformation$.MODULE$.inferFeatureTypes(featureTypeAccumulators2, (RDD<Row>)transformedRdd, featureRefStrs);
        Map inferredFeatureTypeConfigs = (Map)inferredFeatureTypes.map((Function1 & Serializable & scala.Serializable)featureTypeEntry -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(featureTypeEntry._1()), (Object)new FeatureTypeConfig((FeatureTypes)((Object)((Object)featureTypeEntry._2())))), Map$.MODULE$.canBuildFrom());
        Tuple2 tuple2 = featureTypeConfigs.partition((Function1 & Serializable & scala.Serializable)x -> BoxesRunTime.boxToBoolean((boolean)DataFrameBasedRowEvaluator$.$anonfun$transformToFDSTensor$5(x)));
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        Map unspecifiedTypeConfigs = (Map)tuple2._1();
        Map providedTypeConfigs = (Map)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)unspecifiedTypeConfigs, (Object)providedTypeConfigs);
        Tuple2 tuple23 = tuple22;
        Map unspecifiedTypeConfigs2 = (Map)tuple23._1();
        Map providedTypeConfigs2 = (Map)tuple23._2();
        Map mergedTypeConfigs = unspecifiedTypeConfigs2.$plus$plus((GenTraversableOnce)inferredFeatureTypeConfigs).$plus$plus((GenTraversableOnce)providedTypeConfigs2);
        Dataset<Row> featureDF = this.createFDSFeatureDF(inputDF, featureRefStrs, spark, (RDD<Row>)transformedRdd, (Map<String, FeatureTypeConfig>)mergedTypeConfigs);
        return new FeatureDataFrame(featureDF, (Map<String, FeatureTypeConfig>)mergedTypeConfigs);
    }

    private Dataset<Row> createFDSFeatureDF(Dataset<Row> inputDF, Seq<String> featureRefStrs, SparkSession ss, RDD<Row> transformedRdd, Map<String, FeatureTypeConfig> featureTypesConfigs) {
        String featureColumnNamePrefix = "_feathr_mvel_feature_prefix_";
        Seq<StructField> featureTensorTypeInfo = FeatureTransformation$.MODULE$.getFDSSchemaFields(featureRefStrs, featureTypesConfigs, featureColumnNamePrefix);
        StructType outputSchema = StructType$.MODULE$.apply((Seq)inputDF.schema().union((GenSeq)StructType$.MODULE$.apply(featureTensorTypeInfo), Seq$.MODULE$.canBuildFrom()));
        Dataset transformedDF = ss.createDataFrame(transformedRdd, outputSchema);
        Dataset withoutDupContextFieldDF = transformedDF.drop(featureRefStrs);
        Dataset featureDF = (Dataset)((TraversableOnce)featureRefStrs.zip(featureRefStrs, Seq$.MODULE$.canBuildFrom())).foldLeft((Object)withoutDupContextFieldDF, (Function2 & Serializable & scala.Serializable)(baseDF, namePair) -> baseDF.withColumnRenamed(new StringBuilder(0).append(featureColumnNamePrefix).append(namePair._1()).toString(), (String)namePair._2()));
        return featureDF;
    }

    public static final /* synthetic */ boolean $anonfun$transformToFDSTensor$5(Tuple2 x) {
        FeatureTypes featureTypes = ((FeatureTypeConfig)x._2()).getFeatureType();
        FeatureTypes featureTypes2 = FeatureTypes.UNSPECIFIED;
        return !(featureTypes != null ? !((Object)((Object)featureTypes)).equals((Object)featureTypes2) : featureTypes2 != null);
    }

    private DataFrameBasedRowEvaluator$() {
        MODULE$ = this;
    }
}

