/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.utils;

import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.ElasticConfiguration;
import it.agilelab.bigdata.wasp.core.utils.SparkStreamingConfiguration;
import it.agilelab.bigdata.wasp.core.utils.WaspConfiguration;
import it.agilelab.bigdata.wasp.models.PipegraphModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.models.configuration.ConnectionConfig;
import it.agilelab.bigdata.wasp.models.configuration.ElasticConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.KafkaConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.NifiStatelessConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.SparkConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.SparkStreamingConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.TelemetryConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.TelemetryTopicConfigModelMessageFormat$;
import it.agilelab.bigdata.wasp.models.configuration.WaspConfigModel;
import java.io.File;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import org.apache.spark.SparkConf;
import org.apache.spark.UtilsForwarder$;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Vector;
import scala.io.BufferedSource;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

public final class SparkUtils$
implements Logging,
WaspConfiguration,
ElasticConfiguration,
SparkStreamingConfiguration {
    public static SparkUtils$ MODULE$;
    private final String jarsListFileName;
    private SparkStreamingConfigModel sparkStreamingConfig;
    private ElasticConfigModel elasticConfig;
    private WaspConfigModel waspConfig;
    private final WaspLogger logger;
    private volatile byte bitmap$0;

    static {
        new SparkUtils$();
    }

    private SparkStreamingConfigModel sparkStreamingConfig$lzycompute() {
        SparkUtils$ sparkUtils$ = this;
        synchronized (sparkUtils$) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.sparkStreamingConfig = SparkStreamingConfiguration.sparkStreamingConfig$((SparkStreamingConfiguration)this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.sparkStreamingConfig;
    }

    public SparkStreamingConfigModel sparkStreamingConfig() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.sparkStreamingConfig$lzycompute() : this.sparkStreamingConfig;
    }

    private ElasticConfigModel elasticConfig$lzycompute() {
        SparkUtils$ sparkUtils$ = this;
        synchronized (sparkUtils$) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.elasticConfig = ElasticConfiguration.elasticConfig$((ElasticConfiguration)this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.elasticConfig;
    }

    public ElasticConfigModel elasticConfig() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.elasticConfig$lzycompute() : this.elasticConfig;
    }

    private WaspConfigModel waspConfig$lzycompute() {
        SparkUtils$ sparkUtils$ = this;
        synchronized (sparkUtils$) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                this.waspConfig = WaspConfiguration.waspConfig$((WaspConfiguration)this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
        }
        return this.waspConfig;
    }

    public WaspConfigModel waspConfig() {
        return (byte)(this.bitmap$0 & 4) == 0 ? this.waspConfig$lzycompute() : this.waspConfig;
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    public String jarsListFileName() {
        return this.jarsListFileName;
    }

    public SparkConf buildSparkConfFromSparkConfigModel(SparkConfigModel sparkConfigModel, TelemetryConfigModel telemetryConfig, KafkaConfigModel kafkaConfigModel) {
        BoxedUnit boxedUnit;
        this.logger().info((Function0 & Serializable & scala.Serializable)() -> "Building Spark configuration from configuration model");
        this.logger().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(33).append("Starting from SparkConfigModel:\n\t").append(sparkConfigModel).toString());
        SparkConf sparkConf = new SparkConf().setAppName(sparkConfigModel.appName()).setMaster(sparkConfigModel.master().toString());
        sparkConf.set("spark.submit.deployMode", sparkConfigModel.driver().submitDeployMode()).set("spark.driver.cores", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.driver().cores())).toString()).set("spark.driver.memory", sparkConfigModel.driver().memory()).set("spark.driver.host", sparkConfigModel.driver().host()).set("spark.driver.bindAddress", sparkConfigModel.driver().bindAddress());
        Object object = sparkConfigModel.driver().port() != 0 ? sparkConf.set("spark.driver.port", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.driver().port())).toString()) : BoxedUnit.UNIT;
        sparkConf.set("spark.executor.cores", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.executorCores())).toString()).set("spark.executor.memory", sparkConfigModel.executorMemory()).set("spark.cores.max", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.coresMax())).toString()).set("spark.executor.instances", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.executorInstances())).toString()).setJars(this.getAdditionalJars(sparkConfigModel.additionalJarsPath())).set("spark.yarn.jars", sparkConfigModel.yarnJar()).set("spark.blockManager.port", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.blockManagerPort())).toString()).set("spark.ui.retainedStages", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.retained().retainedStagesJobs())).toString()).set("spark.ui.retainedTasks", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.retained().retainedTasks())).toString()).set("spark.ui.retainedJobs", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.retained().retainedJobs())).toString()).set("spark.sql.ui.retainedExecutions", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.retained().retainedExecutions())).toString()).set("spark.streaming.ui.retainedBatches", ((Object)BoxesRunTime.boxToInteger((int)sparkConfigModel.retained().retainedBatches())).toString()).setAll((Iterable)sparkConfigModel.others().map((Function1 & Serializable & scala.Serializable)v -> new Tuple2((Object)v.key(), (Object)v.value()), Seq$.MODULE$.canBuildFrom()));
        if (sparkConfigModel.kryoSerializer().enabled()) {
            sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
            sparkConf.registerKryoClasses((Class[])Array$.MODULE$.apply((Seq)Nil$.MODULE$, ClassTag$.MODULE$.apply(Class.class)));
            sparkConf.set("spark.kryo.registrator", sparkConfigModel.kryoSerializer().registrators());
            sparkConf.set("spark.kryo.registrationRequired", ((Object)BoxesRunTime.boxToBoolean((boolean)sparkConfigModel.kryoSerializer().strict())).toString());
            boxedUnit = sparkConf.set("spark.kryo.registrationRequired", ((Object)BoxesRunTime.boxToBoolean((boolean)sparkConfigModel.kryoSerializer().strict())).toString());
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        Seq conns = (Seq)this.elasticConfig().connections().filter((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)SparkUtils$.$anonfun$buildSparkConfFromSparkConfigModel$4(x$1)));
        String address = ((TraversableOnce)conns.map((Function1 & Serializable & scala.Serializable)e -> new StringBuilder(1).append(e.host()).append(":").append(e.port()).toString(), Seq$.MODULE$.canBuildFrom())).mkString(",");
        sparkConf.set("es.nodes", address);
        String originalExtraJavaOptions = sparkConf.get("spark.executor.extraJavaOptions", "");
        String telemetryConfigJSON = Base64.getUrlEncoder().encodeToString(TelemetryTopicConfigModelMessageFormat$.MODULE$.telemetryTopicConfigModelFormat().write((Object)telemetryConfig.telemetryTopicConfigModel()).toString().getBytes(StandardCharsets.UTF_8));
        String kafkaTinyConfigJSON = Base64.getUrlEncoder().encodeToString(TelemetryTopicConfigModelMessageFormat$.MODULE$.tinyKafkaConfigFormat().write((Object)kafkaConfigModel.toTinyConfig()).toString().getBytes(StandardCharsets.UTF_8));
        String newExtraJavaOptions = new StringBuilder(66).append(originalExtraJavaOptions).append(" -Dwasp.plugin.telemetry.kafka=\"").append(kafkaTinyConfigJSON).append("\" -Dwasp.plugin.telemetry.topic=\"").append(telemetryConfigJSON).append("\"").toString();
        sparkConf.set("spark.executor.extraJavaOptions", newExtraJavaOptions);
        SparkConfigModel sparkConfigModel2 = sparkConfigModel;
        if (sparkConfigModel2 instanceof SparkStreamingConfigModel) {
            SparkStreamingConfigModel sparkStreamingConfigModel = (SparkStreamingConfigModel)sparkConfigModel2;
            sparkStreamingConfigModel.nifiStateless().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                NifiStatelessConfigModel nifiStatelessConfigModel = x0$1;
                if (nifiStatelessConfigModel == null) {
                    throw new MatchError((Object)nifiStatelessConfigModel);
                }
                String bootstrapJars = nifiStatelessConfigModel.bootstrapJars();
                String systemJars = nifiStatelessConfigModel.systemJars();
                String statelessJars = nifiStatelessConfigModel.statelessJars();
                String extensions = nifiStatelessConfigModel.extensions();
                sparkConf.set("spark.wasp.nifi.lib.stateless", statelessJars);
                sparkConf.set("spark.wasp.nifi.lib.bootstrap", bootstrapJars);
                sparkConf.set("spark.wasp.nifi.lib.system", systemJars);
                sparkConf.set("spark.wasp.nifi.lib.extensions", extensions);
                String newPlugins = (String)Option$.MODULE$.apply((Object)sparkConf.get("spark.executor.plugins", "")).filterNot((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)x$3.isEmpty())).map((Function1 & Serializable & scala.Serializable)x$4 -> new StringBuilder(55).append((String)x$4).append(",it.agilelab.bigdata.wasp.spark.plugins.nifi.NifiPlugin").toString()).getOrElse((Function0 & Serializable & scala.Serializable)() -> "it.agilelab.bigdata.wasp.spark.plugins.nifi.NifiPlugin");
                SparkConf sparkConf = sparkConf.set("spark.executor.plugins", newPlugins);
                return sparkConf;
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        this.logger().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(22).append("Resulting SparkConf:\n\t").append(sparkConf.toDebugString().replace("\n", "\n\t")).toString());
        return sparkConf;
    }

    /*
     * WARNING - void declaration
     */
    private Seq<String> getAdditionalJars(String additionalJarsPath) {
        void v0;
        try (BufferedSource source = Source$.MODULE$.fromFile(new StringBuilder(0).append(additionalJarsPath).append(File.separator).append(this.jarsListFileName()).toString(), Codec$.MODULE$.fallbackSystemCodec());){
            try {
                void var3_3;
                Vector additionalJars = source.getLines().map((Function1 & Serializable & scala.Serializable)jarName -> UtilsForwarder$.MODULE$.resolveURI(new StringBuilder(0).append(additionalJarsPath).append(File.separator).append((String)jarName).toString())).map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.toString()).toVector();
                v0 = var3_3;
            }
            catch (Throwable e) {
                String msg = new StringBuilder(68).append("Unable to completely generate the additional jars list - Exception: ").append(e.getMessage()).toString();
                this.logger().error((Function0 & Serializable & scala.Serializable)() -> msg, e);
                throw e;
            }
        }
        return v0;
    }

    public String generateSpecificStructuredStreamingCheckpointDir(PipegraphModel pipegraph, StructuredStreamingETLModel component) {
        String string = this.waspConfig().environmentPrefix();
        String string2 = "";
        String prefix = !(string != null ? !string.equals(string2) : string2 != null) ? "" : new StringBuilder(1).append("/").append(this.waspConfig().environmentPrefix()).toString();
        return new StringBuilder(24).append(this.sparkStreamingConfig().checkpointDir()).append(prefix).append("/").append("structured_streaming").append("/").append(pipegraph.generateStandardPipegraphName()).append("/").append(component.generateStandardProcessingComponentName()).append("_").append(component.generateStandardWriterName()).toString();
    }

    public long getTriggerIntervalMs(SparkStreamingConfigModel sparkStreamingConfigModel, StructuredStreamingETLModel structuredStreamingETLModel) {
        return BoxesRunTime.unboxToLong((Object)structuredStreamingETLModel.triggerIntervalMs().orElse((Function0 & Serializable & scala.Serializable)() -> sparkStreamingConfigModel.triggerIntervalMs()).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> 0L));
    }

    public static final /* synthetic */ boolean $anonfun$buildSparkConfFromSparkConfigModel$4(ConnectionConfig x$1) {
        Object object = x$1.metadata().flatMap((Function1 & Serializable & scala.Serializable)x$2 -> x$2.get((Object)"connectiontype")).getOrElse((Function0 & Serializable & scala.Serializable)() -> "");
        String string = "rest";
        return !(object != null ? !object.equals(string) : string != null);
    }

    private SparkUtils$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
        WaspConfiguration.$init$((WaspConfiguration)this);
        ElasticConfiguration.$init$((ElasticConfiguration)this);
        SparkStreamingConfiguration.$init$((SparkStreamingConfiguration)this);
        this.jarsListFileName = "jars.list";
    }
}

