/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.feathr.offline.config.location;

import com.linkedin.feathr.common.Header;
import com.linkedin.feathr.common.exception.FeathrException;
import com.linkedin.feathr.offline.config.location.GenericLocation;
import com.linkedin.feathr.offline.generation.FeatureGenUtils$;
import com.linkedin.feathr.offline.join.DataFrameKeyCombiner;
import com.linkedin.feathr.offline.join.DataFrameKeyCombiner$;
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxedUnit;

public final class GenericLocationAdHocPatches$ {
    public static GenericLocationAdHocPatches$ MODULE$;

    static {
        new GenericLocationAdHocPatches$();
    }

    public Dataset<Row> readDf(SparkSession ss, GenericLocation location) {
        location.conf().foreach((Function1 & Serializable & scala.Serializable)e -> {
            GenericLocationAdHocPatches$.$anonfun$readDf$1(ss, e);
            return BoxedUnit.UNIT;
        });
        String string = location.format().toLowerCase();
        Dataset dataset = "org.elasticsearch.spark.sql".equals(string) ? ss.read().format(location.format()).option("es.nodes.wan.only", "true").option("pushdown", true).options(location.options()).load() : ss.read().format(location.format()).options(location.options()).load();
        return dataset;
    }

    public void writeDf(SparkSession ss, Dataset<Row> df, Option<Header> header, GenericLocation location) {
        location.conf().foreach((Function1 & Serializable & scala.Serializable)e -> {
            GenericLocationAdHocPatches$.$anonfun$writeDf$1(ss, e);
            return BoxedUnit.UNIT;
        });
        String string = location.format().toLowerCase();
        if ("cosmos.oltp".equals(string)) {
            Dataset dataset;
            String endpoint = (String)location.options().getOrElse((Object)"spark.cosmos.accountEndpoint", (Function0 & Serializable & scala.Serializable)() -> {
                throw new FeathrException("Missing spark__cosmos__accountEndpoint");
            });
            String key = (String)location.options().getOrElse((Object)"spark.cosmos.accountKey", (Function0 & Serializable & scala.Serializable)() -> {
                throw new FeathrException("Missing spark__cosmos__accountKey");
            });
            String databaseName = (String)location.options().getOrElse((Object)"spark.cosmos.database", (Function0 & Serializable & scala.Serializable)() -> {
                throw new FeathrException("Missing spark__cosmos__database");
            });
            String tableName = (String)location.options().getOrElse((Object)"spark.cosmos.container", (Function0 & Serializable & scala.Serializable)() -> {
                throw new FeathrException("Missing spark__cosmos__container");
            });
            ss.conf().set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog");
            ss.conf().set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", endpoint);
            ss.conf().set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", key);
            ss.sql(new StringBuilder(45).append("CREATE DATABASE IF NOT EXISTS cosmosCatalog.").append(databaseName).append(";").toString());
            ss.sql(new StringBuilder(100).append("CREATE TABLE IF NOT EXISTS cosmosCatalog.").append(databaseName).append(".").append(tableName).append(" using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id')").toString());
            if (!new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])df.columns())).contains((Object)"id")) {
                Dataset dataset2;
                Option<Header> option = header;
                if (option instanceof Some) {
                    boolean x$3;
                    Seq<String> x$2;
                    Dataset x$1;
                    Some some = (Some)option;
                    Header h = (Header)some.value();
                    DataFrameKeyCombiner qual$1 = DataFrameKeyCombiner$.MODULE$.apply();
                    Tuple2<String, Dataset<Row>> tuple2 = qual$1.combine((Dataset<Row>)(x$1 = df), x$2 = FeatureGenUtils$.MODULE$.getKeyColumnsFromHeader(h), x$3 = qual$1.combine$default$3());
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    String keyCol = (String)tuple2._1();
                    Dataset keyedDf = (Dataset)tuple2._2();
                    Tuple2 tuple22 = new Tuple2((Object)keyCol, (Object)keyedDf);
                    Tuple2 tuple23 = tuple22;
                    String keyCol2 = (String)tuple23._1();
                    Dataset keyedDf2 = (Dataset)tuple23._2();
                    dataset2 = keyedDf2.withColumnRenamed(keyCol2, "id");
                } else if (None$.MODULE$.equals(option)) {
                    dataset2 = df.withColumn("id", functions$.MODULE$.monotonically_increasing_id().cast("string"));
                } else {
                    throw new MatchError(option);
                }
                dataset = dataset2;
            } else {
                dataset = df;
            }
            Dataset keyDf = dataset;
            keyDf.write().format(location.format()).options(location.options()).mode((String)location.mode().getOrElse((Function0 & Serializable & scala.Serializable)() -> "append")).save();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if ("org.elasticsearch.spark.sql".equals(string)) {
            Dataset dataset;
            Option<Header> option = header;
            if (option instanceof Some) {
                boolean x$6;
                Seq<String> x$5;
                Dataset x$4;
                Some some = (Some)option;
                Header h = (Header)some.value();
                DataFrameKeyCombiner qual$2 = DataFrameKeyCombiner$.MODULE$.apply();
                Tuple2<String, Dataset<Row>> tuple2 = qual$2.combine((Dataset<Row>)(x$4 = df), x$5 = FeatureGenUtils$.MODULE$.getKeyColumnsFromHeader(h), x$6 = qual$2.combine$default$3());
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                String keyCol = (String)tuple2._1();
                Dataset keyedDf = (Dataset)tuple2._2();
                Tuple2 tuple24 = new Tuple2((Object)keyCol, (Object)keyedDf);
                Tuple2 tuple25 = tuple24;
                String keyCol3 = (String)tuple25._1();
                Dataset keyedDf3 = (Dataset)tuple25._2();
                dataset = keyedDf3.withColumnRenamed(keyCol3, "_id");
            } else if (None$.MODULE$.equals(option)) {
                dataset = df.withColumn("_id", functions$.MODULE$.monotonically_increasing_id().cast("string"));
            } else {
                throw new MatchError(option);
            }
            Dataset keyDf = dataset;
            keyDf.write().format(location.format()).option("es.nodes.wan.only", "true").option("es.mapping.id", "_id").option("es.mapping.exclude", "_id").option("es.write.operation", "upsert").options(location.options()).mode((String)location.mode().getOrElse((Function0 & Serializable & scala.Serializable)() -> "overwrite")).save();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if ("aerospike".equals(string)) {
            Dataset keyDf = !new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])df.columns())).contains((Object)"__key") ? df.withColumn("__key", functions$.MODULE$.monotonically_increasing_id().cast("string")) : df;
            keyDf.write().format(location.format()).option("aerospike.updatebykey", "__key").options(location.options()).mode((String)location.mode().getOrElse((Function0 & Serializable & scala.Serializable)() -> "append")).save();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            df.write().format(location.format()).options(location.options()).mode((String)location.mode().getOrElse((Function0 & Serializable & scala.Serializable)() -> "default")).save();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$readDf$1(SparkSession ss$1, Tuple2 e) {
        ss$1.conf().set((String)e._1(), (String)e._2());
    }

    public static final /* synthetic */ void $anonfun$writeDf$1(SparkSession ss$2, Tuple2 e) {
        ss$2.conf().set((String)e._1(), (String)e._2());
    }

    private GenericLocationAdHocPatches$() {
        MODULE$ = this;
    }
}

