/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.feathr.offline.generation.outputProcessor;

import java.io.Serializable;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BooleanType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DoubleType$;
import org.apache.spark.sql.types.FloatType$;
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class FeatureMonitoringUtils$ {
    public static FeatureMonitoringUtils$ MODULE$;

    static {
        new FeatureMonitoringUtils$();
    }

    public void writeToRedis(SparkSession ss, Dataset<Row> df, String tableName, Seq<String> keyColumns, Set<String> allFeatureCols, SaveMode saveMode) {
        df.show(10);
        StructType dfSchema = df.schema();
        dfSchema.indices().foreach((Function1 & Serializable & scala.Serializable)index -> FeatureMonitoringUtils$.$anonfun$writeToRedis$1(dfSchema, allFeatureCols, df, ss, saveMode, BoxesRunTime.unboxToInt((Object)index)));
    }

    private void writeToSql(SparkSession ss, Dataset<Row> stats_df, String tableName, SaveMode saveMode) {
        if (!ss.sparkContext().isLocal()) {
            String url = ss.conf().get("monitoring_database_url");
            String username = ss.conf().get("monitoring_database_user");
            String password = ss.conf().get("monitoring_database_password");
            Predef$.MODULE$.println((Object)"monitoring output:");
            Predef$.MODULE$.println((Object)new StringBuilder(5).append("url: ").append(url).toString());
            Predef$.MODULE$.println((Object)new StringBuilder(10).append("username: ").append(username).toString());
            stats_df.write().format("jdbc").option("url", url).option("dbtable", tableName).option("user", username).option("password", password).option("ssl", true).option("sslmode", "require").mode(saveMode).save();
        } else {
            stats_df.show(10);
        }
    }

    public static final /* synthetic */ Object $anonfun$writeToRedis$1(StructType dfSchema$1, Set allFeatureCols$1, Dataset df$1, SparkSession ss$1, SaveMode saveMode$1, int index) {
        BoxedUnit boxedUnit;
        StructField field = dfSchema$1.fields()[index];
        String fieldName = field.name();
        if (allFeatureCols$1.contains((Object)fieldName)) {
            BoxedUnit boxedUnit2;
            DataType dataType = field.dataType();
            boolean bl = DoubleType$.MODULE$.equals(dataType) ? true : (FloatType$.MODULE$.equals(dataType) ? true : (IntegerType$.MODULE$.equals(dataType) ? true : LongType$.MODULE$.equals(dataType)));
            if (bl) {
                long missing = df$1.filter(functions$.MODULE$.col(fieldName).isNull()).count();
                long total = df$1.count();
                Dataset stats_df = df$1.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.lit((Object)fieldName).name("feature_name"), functions$.MODULE$.lit((Object)field.dataType().typeName()).name("feature_type"), functions$.MODULE$.current_date().name("date"), functions$.MODULE$.mean(df$1.apply(fieldName)).name("mean"), functions$.MODULE$.avg(df$1.apply(fieldName)).name("avg"), functions$.MODULE$.min(df$1.apply(fieldName)).name("min"), functions$.MODULE$.max(df$1.apply(fieldName)).name("max"), functions$.MODULE$.lit((Object)BoxesRunTime.boxToDouble((double)((double)(total - missing) * 1.0 / (double)total))).name("coverage")}));
                stats_df.show();
                MODULE$.writeToSql(ss$1, (Dataset<Row>)stats_df, fieldName, saveMode$1);
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boolean bl2 = StringType$.MODULE$.equals(dataType) ? true : BooleanType$.MODULE$.equals(dataType);
                if (bl2) {
                    long missing = df$1.filter(functions$.MODULE$.col(fieldName).isNull()).count();
                    long total = df$1.count();
                    long cardinality = df$1.groupBy(fieldName, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().count();
                    Dataset stats_df = df$1.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.lit((Object)fieldName).name("feature_name"), functions$.MODULE$.lit((Object)field.dataType().typeName()).name("feature_type"), functions$.MODULE$.current_date().name("date"), functions$.MODULE$.min(df$1.apply(fieldName)).name("min"), functions$.MODULE$.max(df$1.apply(fieldName)).name("max"), functions$.MODULE$.lit((Object)BoxesRunTime.boxToDouble((double)((double)(total - missing) * 1.0 / (double)total))).name("coverage"), functions$.MODULE$.lit((Object)BoxesRunTime.boxToLong((long)cardinality)).name("cardinality")}));
                    MODULE$.writeToSql(ss$1, (Dataset<Row>)stats_df, fieldName, saveMode$1);
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = rowData -> {
                        DataType arg$macro$1 = field.dataType();
                        Object arg$macro$2 = rowData;
                        throw new RuntimeException(new StringOps("The data type(%s) and data (%s) is not supported in monitoring yet.").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{arg$macro$1, arg$macro$2})));
                    };
                }
            }
            boxedUnit = boxedUnit2;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return boxedUnit;
    }

    private FeatureMonitoringUtils$() {
        MODULE$ = this;
    }
}

