/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.feathr.offline.job;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import com.linkedin.feathr.common.DateParam;
import com.linkedin.feathr.common.Header;
import com.linkedin.feathr.common.JoiningFeatureParams;
import com.linkedin.feathr.common.exception.ErrorLabel;
import com.linkedin.feathr.common.exception.FeathrDataOutputException;
import com.linkedin.feathr.common.exception.FeathrInputDataException;
import com.linkedin.feathr.offline.FeatureDataFrame;
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource;
import com.linkedin.feathr.offline.client.FeathrClient;
import com.linkedin.feathr.offline.client.FeathrClient$;
import com.linkedin.feathr.offline.client.FeathrClient2;
import com.linkedin.feathr.offline.client.FeathrClient2$;
import com.linkedin.feathr.offline.client.InputData;
import com.linkedin.feathr.offline.config.FeatureJoinConfig;
import com.linkedin.feathr.offline.config.FeatureJoinConfig$;
import com.linkedin.feathr.offline.config.datasource.DataSourceConfigUtils$;
import com.linkedin.feathr.offline.config.datasource.DataSourceConfigs;
import com.linkedin.feathr.offline.config.location.DataLocation;
import com.linkedin.feathr.offline.config.location.DataLocation$;
import com.linkedin.feathr.offline.config.location.SimplePath;
import com.linkedin.feathr.offline.generation.SparkIOUtils$;
import com.linkedin.feathr.offline.job.FeathrJoinJobContext;
import com.linkedin.feathr.offline.job.FeathrJoinPreparationInfo;
import com.linkedin.feathr.offline.job.FeathrUdfRegistry$;
import com.linkedin.feathr.offline.job.JoinJobContext;
import com.linkedin.feathr.offline.job.LocalTestConfig;
import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager$;
import com.linkedin.feathr.offline.source.SourceFormatType$;
import com.linkedin.feathr.offline.source.accessor.DataPathHandler;
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor;
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler;
import com.linkedin.feathr.offline.transformation.AnchorToDataSourceMapper;
import com.linkedin.feathr.offline.util.AclCheckUtils$;
import com.linkedin.feathr.offline.util.CmdLineParser;
import com.linkedin.feathr.offline.util.FeathrUtils$;
import com.linkedin.feathr.offline.util.FeaturizedDatasetMetadata;
import com.linkedin.feathr.offline.util.FeaturizedDatasetMetadata$;
import com.linkedin.feathr.offline.util.HdfsUtils$;
import com.linkedin.feathr.offline.util.OptionParam;
import com.linkedin.feathr.offline.util.SourceUtils$;
import com.linkedin.feathr.offline.util.SparkFeaturizedDataset;
import java.io.Serializable;
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.spark.SparkConf;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import scala.Array$;
import scala.Enumeration;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.HashMap;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Properties$;
import scala.util.Success;
import scala.util.Try;

public final class FeatureJoinJob$ {
    public static FeatureJoinJob$ MODULE$;
    private final Logger logger;
    private final String SKIP_OUTPUT;
    private final String SPARK_JOIN_MAX_PARALLELISM;
    private final String SPARK_JOIN_MIN_PARALLELISM;
    private final String SPARK_JOIN_PARALLELISM_DEFAULT;
    private final int SPARK_JOIN_LIMIT_PARTITION_FACTOR;
    private final Logger log;

    static {
        new FeatureJoinJob$();
    }

    public Logger logger() {
        return this.logger;
    }

    public String SKIP_OUTPUT() {
        return this.SKIP_OUTPUT;
    }

    public String SPARK_JOIN_MAX_PARALLELISM() {
        return this.SPARK_JOIN_MAX_PARALLELISM;
    }

    public String SPARK_JOIN_MIN_PARALLELISM() {
        return this.SPARK_JOIN_MIN_PARALLELISM;
    }

    public String SPARK_JOIN_PARALLELISM_DEFAULT() {
        return this.SPARK_JOIN_PARALLELISM_DEFAULT;
    }

    public int SPARK_JOIN_LIMIT_PARTITION_FACTOR() {
        return this.SPARK_JOIN_LIMIT_PARTITION_FACTOR;
    }

    public Logger log() {
        return this.log;
    }

    public void run(SparkSession ss, Configuration hadoopConf, FeathrJoinJobContext jobContext, List<DataPathHandler> dataPathHandlers) {
        List dataLoaderHandlers = (List)dataPathHandlers.map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.dataLoaderHandler(), List$.MODULE$.canBuildFrom());
        FeatureJoinConfig joinConfig = FeatureJoinConfig$.MODULE$.parseJoinConfig(this.hdfsFileReader(ss, jobContext.joinConfig()));
        Predef$.MODULE$.print((Object)new Tuple2((Object)"join config is, ", (Object)joinConfig));
        this.checkAuthorization(ss, hadoopConf, jobContext, (List<DataLoaderHandler>)dataLoaderHandlers);
        SparkSession x$12 = ss;
        Configuration x$2 = hadoopConf;
        FeatureJoinConfig x$3 = joinConfig;
        JoinJobContext x$4 = jobContext.jobJoinContext();
        None$ x$5 = None$.MODULE$;
        List<DataPathHandler> x$6 = dataPathHandlers;
        boolean x$7 = jobContext.useFCM();
        this.feathrJoinRun(x$12, x$2, x$3, x$4, x$6, (Option<LocalTestConfig>)x$5, x$7);
    }

    public String stringifyFeatureNames(scala.collection.immutable.Set<String> nameSet) {
        return new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((TraversableOnce)nameSet.toSeq().sorted((Ordering)Ordering.String$.MODULE$)).toArray(ClassTag$.MODULE$.apply(String.class)))).mkString("\n\t");
    }

    public String hdfsFileReader(SparkSession ss, String path) {
        Predef$.MODULE$.print((Object)new Tuple2((Object)"ss.sparkContext.textFile(path),", (Object)path));
        return new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])ss.sparkContext().textFile(path, ss.sparkContext().textFile$default$2()).collect())).mkString("\n");
    }

    private void checkAuthorization(SparkSession ss, Configuration hadoopConf, FeathrJoinJobContext jobContext, List<DataLoaderHandler> dataLoaderHandlers) {
        DataLocation dataLocation = jobContext.jobJoinContext().outputPath();
        if (dataLocation instanceof SimplePath) {
            SimplePath simplePath = (SimplePath)dataLocation;
            String path = simplePath.path();
            Try<BoxedUnit> try_ = AclCheckUtils$.MODULE$.checkWriteAuthorization(hadoopConf, path);
            if (try_ instanceof Failure) {
                Failure failure = (Failure)try_;
                Throwable e = failure.exception();
                throw new FeathrDataOutputException(ErrorLabel.FEATHR_USER_ERROR, new StringBuilder(37).append("No write permission for output path ").append(jobContext.jobJoinContext().outputPath()).append(".").toString(), e);
            }
            if (!(try_ instanceof Success)) {
                throw new MatchError(try_);
            }
            this.log().debug(new StringBuilder(44).append("Checked write authorization on output path: ").append(jobContext.jobJoinContext().outputPath()).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        jobContext.jobJoinContext().inputData().map((Function1 & Serializable & scala.Serializable)inputData -> {
            FeatureJoinJob$.$anonfun$checkAuthorization$1(ss, dataLoaderHandlers, hadoopConf, inputData);
            return BoxedUnit.UNIT;
        });
    }

    public Tuple2<Dataset<Row>, Header> getFeathrClientAndJoinFeatures(SparkSession ss, Dataset<Row> observations, scala.collection.immutable.Map<String, Seq<JoiningFeatureParams>> featureGroupings, FeatureJoinConfig joinConfig, JoinJobContext jobContext, List<DataPathHandler> dataPathHandlers, Option<LocalTestConfig> localTestConfigOpt) {
        FeathrClient feathrClient = this.getFeathrClient(ss, jobContext, dataPathHandlers, localTestConfigOpt);
        return feathrClient.doJoinObsAndFeatures(joinConfig, jobContext, observations);
    }

    public FeathrClient getFeathrClient(SparkSession ss, JoinJobContext jobContext, List<DataPathHandler> dataPathHandlers, Option<LocalTestConfig> localTestConfigOpt) {
        FeathrClient feathrClient;
        Option<LocalTestConfig> option = localTestConfigOpt;
        if (None$.MODULE$.equals(option)) {
            feathrClient = FeathrClient$.MODULE$.builder(ss).addFeatureDefPath(jobContext.feathrFeatureConfig()).addLocalOverrideDefPath(jobContext.feathrLocalConfig()).addDataPathHandlers(dataPathHandlers).build();
        } else if (option instanceof Some) {
            Some some = (Some)option;
            LocalTestConfig localTestConfig = (LocalTestConfig)some.value();
            feathrClient = FeathrClient$.MODULE$.builder(ss).addFeatureDef(localTestConfig.featureConfig()).addLocalOverrideDef(localTestConfig.localConfig()).addDataPathHandlers(dataPathHandlers).build();
        } else {
            throw new MatchError(option);
        }
        return feathrClient;
    }

    public Option<LocalTestConfig> getFeathrClientAndJoinFeatures$default$7() {
        return None$.MODULE$;
    }

    public Option<LocalTestConfig> getFeathrClient$default$4() {
        return None$.MODULE$;
    }

    public Dataset<Row> getFCMClientAndJoinFeatures(SparkSession ss, Dataset<Row> observations, scala.collection.immutable.Map<String, Seq<JoiningFeatureParams>> featureGroupings, FeatureJoinConfig joinConfig, JoinJobContext jobContext, List<DataPathHandler> dataPathHandlers, Option<LocalTestConfig> localTestConfigOpt) {
        FeathrClient2 feathrClient2 = this.getFCMClient(ss, jobContext, dataPathHandlers, localTestConfigOpt);
        return ((FeatureDataFrame)feathrClient2.joinFeatures(joinConfig, new SparkFeaturizedDataset(observations, new FeaturizedDatasetMetadata(FeaturizedDatasetMetadata$.MODULE$.apply$default$1(), FeaturizedDatasetMetadata$.MODULE$.apply$default$2())), jobContext)._1()).df();
    }

    public FeathrClient2 getFCMClient(SparkSession ss, JoinJobContext jobContext, List<DataPathHandler> dataPathHandlers, Option<LocalTestConfig> localTestConfigOpt) {
        FeathrClient2 feathrClient2;
        Option<LocalTestConfig> option = localTestConfigOpt;
        if (None$.MODULE$.equals(option)) {
            feathrClient2 = FeathrClient2$.MODULE$.builder(ss).addFeatureDefPath(jobContext.feathrFeatureConfig()).addLocalOverrideDefPath(jobContext.feathrLocalConfig()).addDataPathHandlers(dataPathHandlers).build();
        } else if (option instanceof Some) {
            Some some = (Some)option;
            LocalTestConfig localTestConfig = (LocalTestConfig)some.value();
            feathrClient2 = FeathrClient2$.MODULE$.builder(ss).addFeatureDef(localTestConfig.featureConfig()).addLocalOverrideDef(localTestConfig.localConfig()).addDataPathHandlers(dataPathHandlers).build();
        } else {
            throw new MatchError(option);
        }
        return feathrClient2;
    }

    public Option<LocalTestConfig> getFCMClientAndJoinFeatures$default$7() {
        return None$.MODULE$;
    }

    public Option<LocalTestConfig> getFCMClient$default$4() {
        return None$.MODULE$;
    }

    public Tuple2<Option<RDD<GenericRecord>>, Option<Dataset<Row>>> feathrJoinRun(SparkSession ss, Configuration hadoopConf, FeatureJoinConfig joinConfig, JoinJobContext jobContext, List<DataPathHandler> dataPathHandlers, Option<LocalTestConfig> localTestConfig, boolean useFCM) {
        SparkConf sparkConf = ss.sparkContext().getConf();
        List dataLoaderHandlers = (List)dataPathHandlers.map((Function1 & Serializable & scala.Serializable)x$4 -> x$4.dataLoaderHandler(), List$.MODULE$.canBuildFrom());
        scala.collection.immutable.Map<String, Seq<JoiningFeatureParams>> featureGroupings = joinConfig.featureGroupings();
        boolean failOnMissing = new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(ss, FeathrUtils$.MODULE$.FAIL_ON_MISSING_PARTITION()))).toBoolean();
        Dataset<Row> observationsDF = SourceUtils$.MODULE$.loadObservationAsDF(ss, hadoopConf, (InputData)jobContext.inputData().get(), (List<DataLoaderHandler>)dataLoaderHandlers, failOnMissing);
        Dataset joinedDF = useFCM ? this.getFCMClientAndJoinFeatures(ss, observationsDF, featureGroupings, joinConfig, jobContext, dataPathHandlers, localTestConfig) : (Dataset)this.getFeathrClientAndJoinFeatures(ss, observationsDF, featureGroupings, joinConfig, jobContext, dataPathHandlers, localTestConfig)._1();
        scala.collection.immutable.Map parameters = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)SparkIOUtils$.MODULE$.OUTPUT_PARALLELISM()), (Object)Integer.toString(jobContext.numParts())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)SparkIOUtils$.MODULE$.OVERWRITE_MODE()), (Object)"ALL")}));
        SparkIOUtils$.MODULE$.writeDataFrame((Dataset<Row>)joinedDF, jobContext.outputPath(), (scala.collection.immutable.Map<String, String>)parameters, (List<DataLoaderHandler>)dataLoaderHandlers);
        return new Tuple2((Object)None$.MODULE$, (Object)new Some((Object)joinedDF));
    }

    public Option<LocalTestConfig> feathrJoinRun$default$6() {
        return None$.MODULE$;
    }

    public boolean feathrJoinRun$default$7() {
        return false;
    }

    public FeathrJoinJobContext parseInputArgument(String[] args) {
        scala.collection.immutable.Map params = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"feathr-config"), (Object)new OptionParam("f", "Path of the feathr local config file", "FCONF", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"feature-config"), (Object)new OptionParam("ef", "Names of the feathr feature config files", "EFCONF", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"local-override-all"), (Object)new OptionParam("loa", "Local config overrides all other configs", "LOCAL_OVERRIDE", "true")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"join-config"), (Object)new OptionParam("j", "Path of the join config file", "JCONF", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"input"), (Object)new OptionParam("i", "Path of the input data set", "INPUT", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"output"), (Object)new OptionParam("o", "Path of the output", "OUTPUT", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"num-parts"), (Object)new OptionParam("n", "Number of output part files", "NPARTS", "-1")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pass-through-field"), (Object)new OptionParam("p", "Pass-through feature field name", "PFIELD", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pass-through-features"), (Object)new OptionParam("t", "Pass-through feature list, comma-separated", "PLIST", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"source-type"), (Object)new OptionParam("st", "Source type of the observation data", "SRCTYPE", "FIXED_PATH")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"start-date"), (Object)new OptionParam("sd", "Start date of the observation data if it's time based", "SDATE", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"end-date"), (Object)new OptionParam("ed", "End date of the observation data if it's time based", "EDATE", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"num-days"), (Object)new OptionParam("nd", "Number of days before the offset date if it's time based", "NDAYS", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"date-offset"), (Object)new OptionParam("do", "Offset of observation data if it's time based", "DOFFSET", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"join-parallelism"), (Object)new OptionParam("p", "Multiplier to increase the number of partitions of feature datasets during joins", "PARALLEL", "8")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"row-bloomfilter-threshold"), (Object)new OptionParam("rbt", "Performance tuning, if observation record # is less than the threshold, a bloomfilter will be applied", "ROWFILTERTHRESHOLD", "-1")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"job-version"), (Object)new OptionParam("jv", "Job version, integer, job version 2 uses DataFrame and SQL based anchor, default is 2", "JOBVERSION", "2")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"as-tensors"), (Object)new OptionParam("at", "If set to true, get features as tensors else as term-vectors", "AS_TENSORS", "false")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"s3-config"), (Object)new OptionParam("sc", "Authentication config for S3", "S3_CONFIG", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"adls-config"), (Object)new OptionParam("adlc", "Authentication config for ADLS (abfs)", "ADLS_CONFIG", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"blob-config"), (Object)new OptionParam("bc", "Authentication config for Azure Blob Storage (wasb)", "BLOB_CONFIG", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"sql-config"), (Object)new OptionParam("sqlc", "Authentication config for Azure SQL Database (jdbc)", "SQL_CONFIG", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"snowflake-config"), (Object)new OptionParam("sfc", "Authentication config for Snowflake Database (jdbc)", "SNOWFLAKE_CONFIG", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"use-fcm"), (Object)new OptionParam("ufcm", "If set to true, use FCM client, else use Feathr Client", "USE_FCM", "false")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"snowflake-config"), (Object)new OptionParam("sfc", "Authentication config for Snowflake Database (jdbc)", "SNOWFLAKE_CONFIG", "")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"system-properties"), (Object)new OptionParam("sps", "Additional System Properties", "SYSTEM_PROPERTIES_CONFIG", ""))}));
        .colon.colon extraOptions = new .colon.colon((Object)new org.apache.commons.cli.Option("LOCALMODE", "local-mode", false, "Run in local mode"), (List)Nil$.MODULE$);
        CmdLineParser cmdParser = new CmdLineParser(args, (scala.collection.immutable.Map<String, OptionParam>)params, (List<org.apache.commons.cli.Option>)extraOptions);
        String sps = (String)cmdParser.extractOptionalValue("system-properties").getOrElse((Function0 & Serializable & scala.Serializable)() -> "{}");
        HashMap props = (HashMap)new ObjectMapper().registerModule((Module)DefaultScalaModule$.MODULE$).readValue(sps, HashMap.class);
        props.foreach((Function1 & Serializable & scala.Serializable)e -> Properties$.MODULE$.setProp((String)e._1(), (String)e._2()));
        String joinConfig = cmdParser.extractRequiredValue("join-config");
        String input = cmdParser.extractRequiredValue("input");
        Enumeration.Value sourceType = SourceFormatType$.MODULE$.withName(cmdParser.extractRequiredValue("source-type"));
        Option<String> startDate = cmdParser.extractOptionalValue("start-date");
        Option<String> endDate = cmdParser.extractOptionalValue("end-date");
        Option<String> numDays = cmdParser.extractOptionalValue("num-days");
        Option<String> dateOffset = cmdParser.extractOptionalValue("date-offset");
        InputData inputData = new InputData(input, sourceType, startDate, endDate, dateOffset, numDays);
        String string = cmdParser.extractRequiredValue("pass-through-features");
        scala.collection.immutable.Set set = "".equals(string) ? Predef$.MODULE$.Set().empty() : new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])string.split(","))).map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.trim(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).toSet();
        scala.collection.immutable.Set passThroughFeatures = set;
        Option<String> feathrLocalConfig = cmdParser.extractOptionalValue("feathr-config");
        Option<String> feathrFeatureConfig = cmdParser.extractOptionalValue("feature-config");
        String localOverrideAll = cmdParser.extractRequiredValue("local-override-all");
        DataLocation outputPath = DataLocation$.MODULE$.apply(cmdParser.extractRequiredValue("output"));
        int numParts = new StringOps(Predef$.MODULE$.augmentString(cmdParser.extractRequiredValue("num-parts"))).toInt();
        JoinJobContext joinJobContext = new JoinJobContext(feathrLocalConfig, feathrFeatureConfig, (Option<InputData>)new Some((Object)inputData), outputPath, numParts);
        DataSourceConfigs dataSourceConfigs = DataSourceConfigUtils$.MODULE$.getConfigs(cmdParser);
        boolean useFCM = new StringOps(Predef$.MODULE$.augmentString(cmdParser.extractRequiredValue("use-fcm"))).toBoolean();
        return new FeathrJoinJobContext(joinConfig, joinJobContext, dataSourceConfigs, useFCM);
    }

    public FeatureJoinConfig parseJoinConfig(String joinConfString) {
        return FeatureJoinConfig$.MODULE$.parseJoinConfig(joinConfString);
    }

    public java.util.Map<String, Dataset<Row>> loadSourceDataframe(String[] args, Set<String> featureNamesInAnchorSet) {
        this.logger().info(new StringBuilder(25).append("FeatureJoinJob args are: ").append(args).toString());
        this.logger().info("Feature join job: loadDataframe");
        this.logger().info(featureNamesInAnchorSet);
        FeathrJoinPreparationInfo feathrJoinPreparationInfo = this.prepareSparkSession(args);
        SparkSession sparkSession = feathrJoinPreparationInfo.sparkSession();
        Configuration hadoopConf = feathrJoinPreparationInfo.hadoopConf();
        FeathrJoinJobContext jobContext = feathrJoinPreparationInfo.jobContext();
        this.checkAuthorization(sparkSession, hadoopConf, jobContext, (List<DataLoaderHandler>)Nil$.MODULE$);
        FeathrClient feathrClient = this.getFeathrClient(sparkSession, jobContext.jobJoinContext(), (List<DataPathHandler>)Nil$.MODULE$, this.getFeathrClient$default$4());
        scala.collection.immutable.Map<String, FeatureAnchorWithSource> allAnchoredFeatures = feathrClient.allAnchoredFeatures();
        boolean failOnMissing = new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(sparkSession, FeathrUtils$.MODULE$.FAIL_ON_MISSING_PARTITION()))).toBoolean();
        AnchorToDataSourceMapper anchorToDataSourceMapper = new AnchorToDataSourceMapper((List<DataPathHandler>)Nil$.MODULE$);
        scala.collection.immutable.Map<FeatureAnchorWithSource, Option<DataSourceAccessor>> anchorsWithSource = anchorToDataSourceMapper.getBasicAnchorDFMapForJoin(sparkSession, (Seq<FeatureAnchorWithSource>)allAnchoredFeatures.values().toSeq(), failOnMissing);
        scala.collection.immutable.Map updatedAnchorsWithSource = (scala.collection.immutable.Map)((TraversableLike)anchorsWithSource.filter((Function1 & Serializable & scala.Serializable)anchorEntry -> BoxesRunTime.boxToBoolean((boolean)FeatureJoinJob$.$anonfun$loadSourceDataframe$1(anchorEntry)))).map((Function1 & Serializable & scala.Serializable)anchorEntry -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(anchorEntry._1()), ((Option)anchorEntry._2()).get()), Map$.MODULE$.canBuildFrom());
        scala.collection.immutable.Map dataFrameMapForPreprocessing = (scala.collection.immutable.Map)((TraversableLike)updatedAnchorsWithSource.filter((Function1 & Serializable & scala.Serializable)x -> BoxesRunTime.boxToBoolean((boolean)featureNamesInAnchorSet.contains(((TraversableOnce)((FeatureAnchorWithSource)x._1()).featureAnchor().features().toSeq().sorted((Ordering)Ordering.String$.MODULE$)).mkString(","))))).map((Function1 & Serializable & scala.Serializable)x -> new Tuple2((Object)((TraversableOnce)((FeatureAnchorWithSource)x._1()).featureAnchor().features().toSeq().sorted((Ordering)Ordering.String$.MODULE$)).mkString(","), ((DataSourceAccessor)x._2()).get()), Map$.MODULE$.canBuildFrom());
        return (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)dataFrameMapForPreprocessing).asJava();
    }

    public void mainWithPreprocessedDataFrame(String[] args, java.util.Map<String, Dataset<Row>> preprocessedDfMap) {
        PreprocessedDataFrameManager$.MODULE$.preprocessedDfMap_$eq((scala.collection.immutable.Map<String, Dataset<Row>>)((TraversableOnce)JavaConverters$.MODULE$.mapAsScalaMapConverter(preprocessedDfMap).asScala()).toMap(Predef$.MODULE$.$conforms()));
        this.main(args);
    }

    public void main(String[] args) {
        this.logger().info(new StringBuilder(25).append("FeatureJoinJob args are: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])args)).mkString("Array(", ", ", ")")).toString());
        FeathrJoinPreparationInfo feathrJoinPreparationInfo = this.prepareSparkSession(args);
        this.run(feathrJoinPreparationInfo.sparkSession(), feathrJoinPreparationInfo.hadoopConf(), feathrJoinPreparationInfo.jobContext(), (List<DataPathHandler>)Nil$.MODULE$);
    }

    public FeathrJoinPreparationInfo prepareSparkSession(String[] args) {
        Object object;
        FeathrJoinJobContext jobContext = this.parseInputArgument(args);
        SparkConf sparkConf = new SparkConf().registerKryoClasses((Class[])((Object[])new Class[]{GenericRecord.class}));
        SparkSession.Builder sparkSessionBuilder = SparkSession$.MODULE$.builder().config(sparkConf).appName(this.getClass().getName()).enableHiveSupport();
        SparkSession sparkSession = sparkSessionBuilder.getOrCreate();
        Configuration conf = sparkSession.sparkContext().hadoopConfiguration();
        DataSourceConfigUtils$.MODULE$.setupHadoopConf(sparkSession, jobContext.dataSourceConfigs());
        FeathrUdfRegistry$.MODULE$.registerUdf(sparkSession);
        DataLocation dataLocation = jobContext.jobJoinContext().outputPath();
        if (dataLocation instanceof SimplePath) {
            SimplePath simplePath = (SimplePath)dataLocation;
            String path = simplePath.path();
            object = BoxesRunTime.boxToBoolean((boolean)HdfsUtils$.MODULE$.deletePath(path, true, conf));
        } else {
            object = BoxedUnit.UNIT;
        }
        boolean enableDebugLog = new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(sparkConf, FeathrUtils$.MODULE$.ENABLE_DEBUG_OUTPUT()))).toBoolean();
        if (enableDebugLog) {
            Configurator.setAllLevels((String)LogManager.getRootLogger().getName(), (Level)Level.DEBUG);
        }
        return new FeathrJoinPreparationInfo(sparkSession, conf, jobContext);
    }

    public static final /* synthetic */ void $anonfun$checkAuthorization$1(SparkSession ss$1, List dataLoaderHandlers$1, Configuration hadoopConf$1, InputData inputData) {
        boolean x$6;
        None$ x$5;
        List x$7;
        Option<DateParam> x$4;
        SparkSession x$32;
        String x$22;
        boolean failOnMissing = new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(ss$1, FeathrUtils$.MODULE$.FAIL_ON_MISSING_PARTITION()))).toBoolean();
        Enumeration.Value x$1 = inputData.sourceType();
        Seq<String> pathList = SourceUtils$.MODULE$.getPathList(x$1, x$22 = inputData.inputPath(), x$32 = ss$1, x$4 = inputData.dateParam(), (List<DataLoaderHandler>)(x$7 = dataLoaderHandlers$1), (Option<String>)(x$5 = None$.MODULE$), x$6 = failOnMissing);
        Seq<Tuple2<String, String>> invalidPathsAndErrors = AclCheckUtils$.MODULE$.checkReadAuthorization(hadoopConf$1, pathList);
        if (!invalidPathsAndErrors.isEmpty()) {
            Seq invalidPaths = (Seq)invalidPathsAndErrors.map((Function1 & Serializable & scala.Serializable)x$2 -> (String)x$2._2(), Seq$.MODULE$.canBuildFrom());
            Seq errorMsgs = (Seq)invalidPathsAndErrors.map((Function1 & Serializable & scala.Serializable)x$3 -> (String)x$3._1(), Seq$.MODULE$.canBuildFrom());
            throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, new StringBuilder(46).append("No read permission on observation data ").append(invalidPaths).append(" with  ").append(errorMsgs).toString());
        }
        MODULE$.log().debug(new StringBuilder(71).append("Checked read authorization on observation data of the following paths:\n").append(pathList.mkString("\n")).toString());
    }

    public static final /* synthetic */ boolean $anonfun$loadSourceDataframe$1(Tuple2 anchorEntry) {
        return ((Option)anchorEntry._2()).isDefined();
    }

    private FeatureJoinJob$() {
        MODULE$ = this;
        this.logger = LogManager.getLogger(this.getClass());
        this.SKIP_OUTPUT = "skip_output";
        this.SPARK_JOIN_MAX_PARALLELISM = "10000";
        this.SPARK_JOIN_MIN_PARALLELISM = "10";
        this.SPARK_JOIN_PARALLELISM_DEFAULT = "5000";
        this.SPARK_JOIN_LIMIT_PARTITION_FACTOR = 2;
        this.log = LogManager.getLogger(this.getClass());
    }
}

