/*
 * Decompiled with CFR 0.152.
 */
package com.ebiznext.comet.job.index.esload;

import com.ebiznext.comet.config.Settings;
import com.ebiznext.comet.config.SparkEnv;
import com.ebiznext.comet.job.index.esload.ESLoadConfig;
import com.ebiznext.comet.schema.handlers.SchemaHandler;
import com.ebiznext.comet.schema.handlers.StorageHandler;
import com.ebiznext.comet.schema.model.Schema;
import com.ebiznext.comet.schema.model.Schema$;
import com.ebiznext.comet.schema.model.SinkType;
import com.ebiznext.comet.schema.model.Views;
import com.ebiznext.comet.utils.JobBase;
import com.ebiznext.comet.utils.JobResult;
import com.ebiznext.comet.utils.SparkJob;
import com.ebiznext.comet.utils.SparkJobResult;
import com.softwaremill.sttp.HttpURLConnectionBackend$;
import com.softwaremill.sttp.RequestT;
import com.softwaremill.sttp.Response;
import com.softwaremill.sttp.SttpBackend;
import com.softwaremill.sttp.Uri;
import com.softwaremill.sttp.package$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.GenTraversableOnce;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.util.Either;
import scala.util.Failure;
import scala.util.Left;
import scala.util.Right;
import scala.util.Success;
import scala.util.Try;

@ScalaSignature(bytes="\u0006\u0001\u0005ed\u0001B\u000b\u0017\u0001\rB\u0001\u0002\r\u0001\u0003\u0002\u0003\u0006I!\r\u0005\tk\u0001\u0011\t\u0011)A\u0005m!Aa\b\u0001B\u0001B\u0003%q\b\u0003\u0005C\u0001\t\u0015\r\u0011b\u0001D\u0011!Q\u0005A!A!\u0002\u0013!\u0005\"B&\u0001\t\u0003a\u0005bB*\u0001\u0005\u0004%\t\u0001\u0016\u0005\u0007[\u0002\u0001\u000b\u0011B+\t\u000f9\u0004!\u0019!C\u0001_\"11\u000f\u0001Q\u0001\nADq\u0001\u001e\u0001C\u0002\u0013\u0005Q\u000f\u0003\u0004\u007f\u0001\u0001\u0006IA\u001e\u0005\t\u007f\u0002\u0011\r\u0011\"\u0001\u0002\u0002!A\u0011\u0011\u000b\u0001!\u0002\u0013\t\u0019\u0001C\u0005\u0002T\u0001\u0011\r\u0011\"\u0001\u0002V!9\u0011q\u000b\u0001!\u0002\u0013\u0019\u0007\"CA-\u0001\t\u0007I\u0011AA.\u0011!\ty\u0006\u0001Q\u0001\n\u0005u\u0003bBA1\u0001\u0011\u0005\u0013Q\u000b\u0005\b\u0003G\u0002A\u0011IA3\u0005%)5\u000bT8bI*{'M\u0003\u0002\u00181\u00051Qm\u001d7pC\u0012T!!\u0007\u000e\u0002\u000b%tG-\u001a=\u000b\u0005ma\u0012a\u00016pE*\u0011QDH\u0001\u0006G>lW\r\u001e\u0006\u0003?\u0001\n\u0001\"\u001a2ju:,\u0007\u0010\u001e\u0006\u0002C\u0005\u00191m\\7\u0004\u0001M\u0019\u0001\u0001\n\u0016\u0011\u0005\u0015BS\"\u0001\u0014\u000b\u0003\u001d\nQa]2bY\u0006L!!\u000b\u0014\u0003\r\u0005s\u0017PU3g!\tYc&D\u0001-\u0015\tiC$A\u0003vi&d7/\u0003\u00020Y\tA1\u000b]1sW*{'-A\u0005dY&\u001cuN\u001c4jOB\u0011!gM\u0007\u0002-%\u0011AG\u0006\u0002\r\u000bNcu.\u00193D_:4\u0017nZ\u0001\u000fgR|'/Y4f\u0011\u0006tG\r\\3s!\t9D(D\u00019\u0015\tI$(\u0001\u0005iC:$G.\u001a:t\u0015\tYD$\u0001\u0004tG\",W.Y\u0005\u0003{a\u0012ab\u0015;pe\u0006<W\rS1oI2,'/A\u0007tG\",W.\u0019%b]\u0012dWM\u001d\t\u0003o\u0001K!!\u0011\u001d\u0003\u001bM\u001b\u0007.Z7b\u0011\u0006tG\r\\3s\u0003!\u0019X\r\u001e;j]\u001e\u001cX#\u0001#\u0011\u0005\u0015CU\"\u0001$\u000b\u0005\u001dc\u0012AB2p]\u001aLw-\u0003\u0002J\r\nA1+\u001a;uS:<7/A\u0005tKR$\u0018N\\4tA\u00051A(\u001b8jiz\"B!\u0014)R%R\u0011aj\u0014\t\u0003e\u0001AQA\u0011\u0004A\u0004\u0011CQ\u0001\r\u0004A\u0002EBQ!\u000e\u0004A\u0002YBQA\u0010\u0004A\u0002}\n!\"Z:sKN|WO]2f+\u0005)\u0006cA\u0013W1&\u0011qK\n\u0002\u0005'>lW\r\u0005\u0003&3n\u001b\u0017B\u0001.'\u0005\u0019!V\u000f\u001d7feA\u0011A,Y\u0007\u0002;*\u0011alX\u0001\u0005Y\u0006twMC\u0001a\u0003\u0011Q\u0017M^1\n\u0005\tl&AB*ue&tw\r\u0005\u0002eW:\u0011Q-\u001b\t\u0003M\u001aj\u0011a\u001a\u0006\u0003Q\n\na\u0001\u0010:p_Rt\u0014B\u00016'\u0003\u0019\u0001&/\u001a3fM&\u0011!\r\u001c\u0006\u0003U\u001a\n1\"Z:sKN|WO]2fA\u0005!Qm]%e+\u0005\u0001\bcA\u0013r1&\u0011!O\n\u0002\u0007\u001fB$\u0018n\u001c8\u0002\u000b\u0015\u001c\u0018\n\u001a\u0011\u0002\u0013\u0015\u001c8\t\\5D_:4W#\u0001<\u0011\t]d8mY\u0007\u0002q*\u0011\u0011P_\u0001\nS6lW\u000f^1cY\u0016T!a\u001f\u0014\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002~q\n\u0019Q*\u00199\u0002\u0015\u0015\u001c8\t\\5D_:4\u0007%\u0001\u0003qCRDWCAA\u0002!!\t)!a\u0004\u0002\u0016\u00055b\u0002BA\u0004\u0003\u0017q1AZA\u0005\u0013\u00059\u0013bAA\u0007M\u00059\u0001/Y2lC\u001e,\u0017\u0002BA\t\u0003'\u0011a!R5uQ\u0016\u0014(bAA\u0007MA!\u0011qCA\u0015\u001b\t\tIB\u0003\u0003\u0002\u001c\u0005u\u0011A\u00014t\u0015\u0011\ty\"!\t\u0002\r!\fGm\\8q\u0015\u0011\t\u0019#!\n\u0002\r\u0005\u0004\u0018m\u00195f\u0015\t\t9#A\u0002pe\u001eLA!a\u000b\u0002\u001a\t!\u0001+\u0019;i!\u0011\ty#a\u0013\u000f\t\u0005E\u0012q\t\b\u0005\u0003g\t\tE\u0004\u0003\u00026\u0005ub\u0002BA\u001c\u0003wq1AZA\u001d\u0013\t\t9#\u0003\u0003\u0002$\u0005\u0015\u0012\u0002BA \u0003C\tQa\u001d9be.LA!a\u0011\u0002F\u0005\u00191/\u001d7\u000b\t\u0005}\u0012\u0011E\u0005\u0005\u0003\u001b\tIE\u0003\u0003\u0002D\u0005\u0015\u0013\u0002BA'\u0003\u001f\u0012\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\t\u00055\u0011\u0011J\u0001\u0006a\u0006$\b\u000eI\u0001\u0007M>\u0014X.\u0019;\u0016\u0003\r\fqAZ8s[\u0006$\b%A\u0004eCR\f7/\u001a;\u0016\u0005\u0005u\u0003\u0003B\u0013r\u0003\u0007\t\u0001\u0002Z1uCN,G\u000fI\u0001\u0005]\u0006lW-A\u0002sk:$\"!a\u001a\u0011\r\u0005%\u0014qNA:\u001b\t\tYGC\u0002\u0002n\u0019\nA!\u001e;jY&!\u0011\u0011OA6\u0005\r!&/\u001f\t\u0004W\u0005U\u0014bAA<Y\tI!j\u001c2SKN,H\u000e\u001e")
public class ESLoadJob
implements SparkJob {
    private final ESLoadConfig cliConfig;
    private final StorageHandler storageHandler;
    private final SchemaHandler schemaHandler;
    private final Settings settings;
    private final Some<Tuple2<String, String>> esresource;
    private final Option<Tuple2<String, String>> esId;
    private final Map<String, String> esCliConf;
    private final Either<Path, Dataset<Row>> path;
    private final String format;
    private final Option<Either<Path, Dataset<Row>>> dataset;
    private SparkEnv sparkEnv;
    private SparkSession session;
    private final Logger logger;
    private volatile byte bitmap$0;

    @Override
    public void registerUdf(String udf) {
        SparkJob.registerUdf$(this, udf);
    }

    @Override
    public DataFrameWriter<Row> partitionedDatasetWriter(Dataset<Row> dataset, List<String> partition) {
        return SparkJob.partitionedDatasetWriter$(this, dataset, partition);
    }

    @Override
    public Dataset<Row> partitionDataset(Dataset<Row> dataset, List<String> partition) {
        return SparkJob.partitionDataset$(this, dataset, partition);
    }

    @Override
    public Object analyze(String fullTableName) {
        return SparkJob.analyze$(this, fullTableName);
    }

    @Override
    public void createSparkViews(Views views, Map<String, String> sqlParameters) {
        SparkJob.createSparkViews$(this, views, sqlParameters);
    }

    @Override
    public Tuple3<SinkType, Option<String>, String> parseViewDefinition(String valueWithEnv) {
        return JobBase.parseViewDefinition$(this, valueWithEnv);
    }

    private SparkEnv sparkEnv$lzycompute() {
        ESLoadJob eSLoadJob = this;
        synchronized (eSLoadJob) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.sparkEnv = SparkJob.sparkEnv$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.sparkEnv;
    }

    @Override
    public SparkEnv sparkEnv() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.sparkEnv$lzycompute() : this.sparkEnv;
    }

    private SparkSession session$lzycompute() {
        ESLoadJob eSLoadJob = this;
        synchronized (eSLoadJob) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.session = SparkJob.session$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.session;
    }

    @Override
    public SparkSession session() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.session$lzycompute() : this.session;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    @Override
    public Settings settings() {
        return this.settings;
    }

    public Some<Tuple2<String, String>> esresource() {
        return this.esresource;
    }

    public Option<Tuple2<String, String>> esId() {
        return this.esId;
    }

    public Map<String, String> esCliConf() {
        return this.esCliConf;
    }

    public Either<Path, Dataset<Row>> path() {
        return this.path;
    }

    public String format() {
        return this.format;
    }

    public Option<Either<Path, Dataset<Row>>> dataset() {
        return this.dataset;
    }

    @Override
    public String name() {
        return new StringBuilder(6).append("Index ").append(this.path()).toString();
    }

    @Override
    public Try<JobResult> run() {
        BoxedUnit boxedUnit;
        Dataset dataset;
        BoxedUnit boxedUnit2;
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info("Indexing resource {} with {}", new Object[]{this.cliConfig.getResource(), this.cliConfig});
            boxedUnit2 = BoxedUnit.UNIT;
        } else {
            boxedUnit2 = BoxedUnit.UNIT;
        }
        Either<Path, Dataset<Row>> either = this.path();
        if (either instanceof Left) {
            Dataset dataset2;
            Left left = (Left)either;
            Path path2 = (Path)left.value();
            String string = this.format();
            if ("json".equals(string)) {
                dataset2 = this.session().read().option("multiline", true).json(path2.toString());
            } else if ("json-array".equals(string)) {
                Dataset jsonDS = this.session().read().textFile(path2.toString());
                dataset2 = this.session().read().json(jsonDS);
            } else if ("parquet".equals(string)) {
                dataset2 = this.session().read().parquet(path2.toString());
            } else {
                throw new MatchError((Object)string);
            }
            dataset = dataset2;
        } else if (either instanceof Right) {
            Dataset df;
            Right right = (Right)either;
            dataset = df = (Dataset)right.value();
        } else {
            throw new MatchError(either);
        }
        Dataset inputDF = dataset;
        Dataset df = (Dataset)this.cliConfig.getTimestampCol().map((Function1 & Serializable & scala.Serializable)tsCol -> inputDF.withColumn("comet_es_tmp", functions$.MODULE$.date_format(functions$.MODULE$.col(tsCol), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")).drop(tsCol).withColumnRenamed("comet_es_tmp", tsCol)).getOrElse((Function0 & Serializable & scala.Serializable)() -> inputDF);
        String content = (String)this.cliConfig.mapping().map((Function1 & Serializable & scala.Serializable)path -> $this.storageHandler.read((Path)path)).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            Option dynamicTemplate = $this.schemaHandler.getDomain($this.cliConfig.domain()).flatMap((Function1 & Serializable & scala.Serializable)domain -> domain.schemas().find((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)ESLoadJob.$anonfun$run$6(this, x$2))).map((Function1 & Serializable & scala.Serializable)schema -> schema.mapping(domain.mapping((Schema)schema, this.settings()), domain.name(), $this.schemaHandler, this.settings())));
            return (String)dynamicTemplate.getOrElse((Function0 & Serializable & scala.Serializable)() -> Schema$.MODULE$.mapping($this.cliConfig.domain(), $this.cliConfig.schema(), new StructField("ignore", (DataType)df.schema(), StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), $this.schemaHandler, this.settings()));
        });
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info("Registering template {}_{} -> {}", new Object[]{this.cliConfig.domain().toLowerCase(), this.cliConfig.schema().toLowerCase(), content});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        Map esOptions = ((TraversableOnce)JavaConverters$.MODULE$.mapAsScalaMapConverter(this.settings().comet().elasticsearch().options()).asScala()).toMap(Predef$.MODULE$.$conforms());
        String host = (String)esOptions.getOrElse((Object)"es.nodes", (Function0 & Serializable & scala.Serializable)() -> "localhost");
        int port = new StringOps(Predef$.MODULE$.augmentString((String)esOptions.getOrElse((Object)"es.port", (Function0 & Serializable & scala.Serializable)() -> "9200"))).toInt();
        boolean ssl = new StringOps(Predef$.MODULE$.augmentString((String)esOptions.getOrElse((Object)"es.net.ssl", (Function0 & Serializable & scala.Serializable)() -> "false"))).toBoolean();
        String protocol = ssl ? "https" : "http";
        Option username = esOptions.get((Object)"net.http.auth.user");
        Option password = esOptions.get((Object)"net.http.auth.password");
        SttpBackend backend = HttpURLConnectionBackend$.MODULE$.apply(HttpURLConnectionBackend$.MODULE$.apply$default$1(), HttpURLConnectionBackend$.MODULE$.apply$default$2(), HttpURLConnectionBackend$.MODULE$.apply$default$3(), HttpURLConnectionBackend$.MODULE$.apply$default$4());
        Option authSttp = username.flatMap((Function1 & Serializable & scala.Serializable)u -> password.map((Function1 & Serializable & scala.Serializable)p -> package$.MODULE$.sttp().auth().basic(u, p)));
        Uri templateUri = package$.MODULE$.UriContext(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "://", ":", "/_template/", ""}))).uri((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{protocol, host, BoxesRunTime.boxToInteger((int)port), this.cliConfig.getIndexName()}));
        RequestT requestDel = ((RequestT)authSttp.getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.sttp())).delete(templateUri).contentType("application/json");
        Response response = (Response)requestDel.send(backend, Predef.$eq$colon$eq$.MODULE$.tpEquals());
        RequestT requestPut = ((RequestT)authSttp.getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.sttp())).body(content).put(templateUri).contentType("application/json");
        Response responsePut = (Response)requestPut.send(backend, Predef.$eq$colon$eq$.MODULE$.tpEquals());
        boolean ok = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(200), 299).contains(responsePut.code());
        if (ok) {
            BoxedUnit boxedUnit3;
            List allConf = (List)esOptions.toList().$plus$plus((GenTraversableOnce)this.esCliConf().toList(), List$.MODULE$.canBuildFrom());
            if (this.logger().underlying().isDebugEnabled()) {
                if (this.logger().underlying().isDebugEnabled()) {
                    this.logger().underlying().debug("sending {} documents to Elasticsearch using {}", new Object[]{BoxesRunTime.boxToLong((long)df.count()), allConf});
                    boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    boxedUnit3 = BoxedUnit.UNIT;
                }
            } else {
                boxedUnit3 = BoxedUnit.UNIT;
            }
            DataFrameWriter writer = ((DataFrameWriter)allConf.foldLeft((Object)df.write(), (Function2 & Serializable & scala.Serializable)(x0$1, x1$1) -> {
                Tuple2 tuple2;
                DataFrameWriter w;
                block3: {
                    Tuple2 tuple22;
                    block2: {
                        tuple22 = new Tuple2(x0$1, x1$1);
                        if (tuple22 == null) break block2;
                        w = (DataFrameWriter)tuple22._1();
                        tuple2 = (Tuple2)tuple22._2();
                        if (tuple2 != null) break block3;
                    }
                    throw new MatchError((Object)tuple22);
                }
                String k = (String)tuple2._1();
                String v = (String)tuple2._2();
                DataFrameWriter dataFrameWriter = w.option(k, v);
                return dataFrameWriter;
            })).format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite);
            if (this.settings().comet().isElasticsearchSupported()) {
                writer.save(this.cliConfig.getResource());
            }
        } else {
            Failure failure;
            Failure failure2 = failure;
            Failure failure3 = failure;
            throw new Exception("Failed to create template");
        }
        return new Success((Object)new SparkJobResult((Option<Dataset<Row>>)None$.MODULE$));
    }

    public static final /* synthetic */ boolean $anonfun$run$6(ESLoadJob $this, Schema x$2) {
        String string = x$2.name();
        String string2 = $this.cliConfig.schema();
        return !(string != null ? !string.equals(string2) : string2 != null);
    }

    public ESLoadJob(ESLoadConfig cliConfig, StorageHandler storageHandler, SchemaHandler schemaHandler, Settings settings) {
        this.cliConfig = cliConfig;
        this.storageHandler = storageHandler;
        this.schemaHandler = schemaHandler;
        this.settings = settings;
        StrictLogging.$init$((StrictLogging)this);
        JobBase.$init$(this);
        SparkJob.$init$(this);
        this.esresource = new Some((Object)new Tuple2((Object)"es.resource.write", (Object)String.valueOf(cliConfig.getResource())));
        this.esId = cliConfig.id().map((Function1 & Serializable & scala.Serializable)x$1 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"es.mapping.id"), x$1));
        this.esCliConf = cliConfig.options().$plus$plus((GenTraversableOnce)((TraversableOnce)new .colon.colon(this.esresource(), (List)new .colon.colon(this.esId(), (List)Nil$.MODULE$)).flatten((Function1 & Serializable & scala.Serializable)xo -> Option$.MODULE$.option2Iterable(xo))).toMap(Predef$.MODULE$.$conforms()));
        this.path = cliConfig.getDataset(settings);
        this.format = cliConfig.format();
        this.dataset = cliConfig.dataset();
    }
}

