/*
 * Decompiled with CFR 0.152.
 */
package com.ebiznext.comet.job.ingest;

import com.ebiznext.comet.config.Settings;
import com.ebiznext.comet.job.index.bqload.BigQueryLoadConfig;
import com.ebiznext.comet.job.index.bqload.BigQueryLoadConfig$;
import com.ebiznext.comet.job.index.bqload.BigQuerySparkJob;
import com.ebiznext.comet.job.index.connectionload.ConnectionLoadConfig;
import com.ebiznext.comet.job.index.connectionload.ConnectionLoadConfig$;
import com.ebiznext.comet.job.index.connectionload.ConnectionLoadJob;
import com.ebiznext.comet.job.ingest.AuditLog;
import com.ebiznext.comet.job.ingest.SparkAuditLogWriter$;
import com.ebiznext.comet.schema.model.BigQuerySink;
import com.ebiznext.comet.schema.model.Engine;
import com.ebiznext.comet.schema.model.EsSink;
import com.ebiznext.comet.schema.model.FsSink;
import com.ebiznext.comet.schema.model.JdbcSink;
import com.ebiznext.comet.schema.model.NoneSink;
import com.ebiznext.comet.schema.model.RowLevelSecurity;
import com.ebiznext.comet.schema.model.Sink;
import com.ebiznext.comet.utils.FileLock;
import com.ebiznext.comet.utils.JobResult;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import org.apache.hadoop.fs.Path;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.AtomicType;
import org.apache.spark.sql.types.BooleanType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.Either;
import scala.util.Right;
import scala.util.Try;

public final class SparkAuditLogWriter$
implements StrictLogging {
    public static SparkAuditLogWriter$ MODULE$;
    private final List<Tuple3<String, LegacySQLTypeName, AtomicType>> auditCols;
    private final Logger logger;

    static {
        new SparkAuditLogWriter$();
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    public List<Tuple3<String, LegacySQLTypeName, AtomicType>> auditCols() {
        return this.auditCols;
    }

    private Schema bigqueryAuditSchema() {
        List fields = (List)this.auditCols().map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple3 tuple3 = x0$1;
            if (tuple3 == null) {
                throw new MatchError((Object)tuple3);
            }
            String name = (String)tuple3._1();
            LegacySQLTypeName tpe = (LegacySQLTypeName)tuple3._2();
            Field field = Field.newBuilder((String)name, (LegacySQLTypeName)tpe, (Field[])new Field[0]).setMode(Field.Mode.NULLABLE).setDescription("").build();
            return field;
        }, List$.MODULE$.canBuildFrom());
        return Schema.of((Field[])((Field[])fields.toArray(ClassTag$.MODULE$.apply(Field.class))));
    }

    public Object append(SparkSession session, AuditLog log, Settings settings) {
        Try<JobResult> try_;
        Sink sink;
        RDD auditTypedRDD = session.sparkContext().parallelize((Seq)new .colon.colon((Object)log, (List)Nil$.MODULE$), session.sparkContext().parallelize$default$2(), ClassTag$.MODULE$.apply(AuditLog.class));
        JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        public final class Com_ebiznext_comet_job_ingest_SparkAuditLogWriter$$typecreator5$2
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $m.staticClass("com.ebiznext.comet.job.ingest.AuditLog").asType().toTypeConstructor();
            }

            public Com_ebiznext_comet_job_ingest_SparkAuditLogWriter$$typecreator5$2() {
            }
        }
        Dataset auditDF = session.createDataFrame(session.implicits().rddToDatasetHolder(auditTypedRDD, session.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Com_ebiznext_comet_job_ingest_SparkAuditLogWriter$$typecreator5$2()))).toDF().rdd(), StructType$.MODULE$.apply((Seq)this.auditCols().map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple3 tuple3 = x0$1;
            if (tuple3 == null) {
                throw new MatchError((Object)tuple3);
            }
            String name = (String)tuple3._1();
            AtomicType sparkType = (AtomicType)tuple3._3();
            StructField structField = new StructField(name, (DataType)sparkType, true, StructField$.MODULE$.apply$default$4());
            return structField;
        }, List$.MODULE$.canBuildFrom()))).toDF((Seq)this.auditCols().map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            String name;
            Tuple3 tuple3 = x0$2;
            if (tuple3 == null) {
                throw new MatchError((Object)tuple3);
            }
            String string = name = (String)tuple3._1();
            return string;
        }, List$.MODULE$.canBuildFrom()));
        if (settings.comet().sinkToFile()) {
            SparkAuditLogWriter$.sinkToFile$1(log, settings, session);
        }
        if ((sink = settings.comet().audit().sink()) instanceof JdbcSink) {
            JdbcSink jdbcSink = (JdbcSink)sink;
            String x$1 = jdbcSink.connection();
            Settings.Comet x$2 = settings.comet();
            Right x$3 = package$.MODULE$.Right().apply((Object)auditDF);
            String x$4 = "audit";
            int x$5 = BoxesRunTime.unboxToInt((Object)jdbcSink.partitions().getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 1));
            int x$6 = BoxesRunTime.unboxToInt((Object)jdbcSink.batchsize().getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 1000));
            Map<String, String> x$7 = jdbcSink.getOptions();
            JobInfo.CreateDisposition x$8 = ConnectionLoadConfig$.MODULE$.fromComet$default$5();
            JobInfo.WriteDisposition x$9 = ConnectionLoadConfig$.MODULE$.fromComet$default$6();
            boolean x$10 = ConnectionLoadConfig$.MODULE$.fromComet$default$10();
            ConnectionLoadConfig jdbcConfig = ConnectionLoadConfig$.MODULE$.fromComet(x$1, x$2, (Either<String, Dataset<Row>>)x$3, x$4, x$8, x$9, x$5, x$6, x$7, x$10);
            try_ = new ConnectionLoadJob(jdbcConfig, settings).run();
        } else if (sink instanceof BigQuerySink) {
            BigQuerySink bigQuerySink = (BigQuerySink)sink;
            Right x$11 = package$.MODULE$.Right().apply((Object)auditDF);
            String x$12 = (String)bigQuerySink.name().getOrElse((Function0 & Serializable & scala.Serializable)() -> "audit");
            String x$13 = "audit";
            None$ x$14 = None$.MODULE$;
            Nil$ x$15 = Nil$.MODULE$;
            String x$16 = "parquet";
            String x$17 = "CREATE_IF_NEEDED";
            String x$18 = "WRITE_APPEND";
            None$ x$19 = None$.MODULE$;
            None$ x$20 = None$.MODULE$;
            Map<String, String> x$21 = bigQuerySink.getOptions();
            Option<List<RowLevelSecurity>> x$22 = BigQueryLoadConfig$.MODULE$.apply$default$11();
            boolean x$23 = BigQueryLoadConfig$.MODULE$.apply$default$12();
            Engine x$24 = BigQueryLoadConfig$.MODULE$.apply$default$13();
            BigQueryLoadConfig bqConfig = new BigQueryLoadConfig((Either<String, Dataset<Row>>)x$11, x$12, x$13, (Option<String>)x$14, (Seq<String>)x$15, x$16, x$17, x$18, (Option<String>)x$19, (Option<Object>)x$20, x$22, x$23, x$24, x$21);
            try_ = new BigQuerySparkJob(bqConfig, (Option<Schema>)new Some((Object)this.bigqueryAuditSchema()), settings).run();
        } else {
            if (sink instanceof EsSink) {
                throw new Exception("Sinking Audit log to Elasticsearch not yet supported");
            }
            boolean bl = sink instanceof NoneSink ? true : sink instanceof FsSink;
            if (bl && !settings.comet().sinkToFile()) {
                SparkAuditLogWriter$.sinkToFile$1(log, settings, session);
                try_ = BoxedUnit.UNIT;
            } else {
                boolean bl2 = sink instanceof NoneSink ? true : sink instanceof FsSink;
                if (bl2 && settings.comet().sinkToFile()) {
                    try_ = BoxedUnit.UNIT;
                } else {
                    throw new MatchError((Object)sink);
                }
            }
        }
        return try_;
    }

    private static final void sinkToFile$1(AuditLog log, Settings settings, SparkSession session$1) {
        Path lockPath = new Path(settings.comet().audit().path(), "audit.lock");
        FileLock locker = new FileLock(lockPath, settings.storageHandler());
        locker.doExclusively(locker.doExclusively$default$1(), (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            BoxedUnit boxedUnit;
            Path auditPath = new Path(settings.comet().audit().path(), "ingestion-log");
            JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(MODULE$.getClass().getClassLoader());
            public final class Com_ebiznext_comet_job_ingest_SparkAuditLogWriter$$typecreator5$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $m.staticClass("com.ebiznext.comet.job.ingest.AuditLog").asType().toTypeConstructor();
                }

                public Com_ebiznext_comet_job_ingest_SparkAuditLogWriter$$typecreator5$1() {
                }
            }
            DataFrameWriter dfWriter = session$1.implicits().localSeqToDatasetHolder((Seq)new .colon.colon((Object)log, (List)Nil$.MODULE$), session$1.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Com_ebiznext_comet_job_ingest_SparkAuditLogWriter$$typecreator5$1()))).toDF().write().mode(SaveMode.Append);
            if (MODULE$.logger().underlying().isInfoEnabled()) {
                MODULE$.logger().underlying().info("Saving audit to path {}", new Object[]{auditPath});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            if (settings.comet().hive()) {
                BoxedUnit boxedUnit2;
                String hiveDB = (String)settings.comet().audit().sink().name().getOrElse((Function0 & Serializable & scala.Serializable)() -> "audit");
                String tableName = "audit";
                String fullTableName = new StringBuilder(1).append(hiveDB).append(".").append(tableName).toString();
                session$1.sql(new StringBuilder(30).append("create database if not exists ").append(hiveDB).toString());
                session$1.sql(new StringBuilder(4).append("use ").append(hiveDB).toString());
                if (MODULE$.logger().underlying().isInfoEnabled()) {
                    MODULE$.logger().underlying().info("Saving audit to table {}", new Object[]{fullTableName});
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                dfWriter.format(settings.comet().defaultAuditWriteFormat()).saveAsTable(fullTableName);
            } else {
                BoxedUnit boxedUnit3;
                if (MODULE$.logger().underlying().isInfoEnabled()) {
                    MODULE$.logger().underlying().info("Saving audit to file {}", new Object[]{auditPath});
                    boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    boxedUnit3 = BoxedUnit.UNIT;
                }
                dfWriter.format(settings.comet().defaultAuditWriteFormat()).option("path", auditPath.toString()).save();
            }
        });
    }

    private SparkAuditLogWriter$() {
        MODULE$ = this;
        StrictLogging.$init$((StrictLogging)this);
        this.auditCols = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple3[]{new Tuple3((Object)"jobid", (Object)LegacySQLTypeName.STRING, (Object)StringType$.MODULE$), new Tuple3((Object)"paths", (Object)LegacySQLTypeName.STRING, (Object)StringType$.MODULE$), new Tuple3((Object)"domain", (Object)LegacySQLTypeName.STRING, (Object)StringType$.MODULE$), new Tuple3((Object)"schema", (Object)LegacySQLTypeName.STRING, (Object)StringType$.MODULE$), new Tuple3((Object)"success", (Object)LegacySQLTypeName.BOOLEAN, (Object)BooleanType$.MODULE$), new Tuple3((Object)"count", (Object)LegacySQLTypeName.INTEGER, (Object)LongType$.MODULE$), new Tuple3((Object)"countAccepted", (Object)LegacySQLTypeName.INTEGER, (Object)LongType$.MODULE$), new Tuple3((Object)"countRejected", (Object)LegacySQLTypeName.INTEGER, (Object)LongType$.MODULE$), new Tuple3((Object)"timestamp", (Object)LegacySQLTypeName.TIMESTAMP, (Object)TimestampType$.MODULE$), new Tuple3((Object)"duration", (Object)LegacySQLTypeName.INTEGER, (Object)LongType$.MODULE$), new Tuple3((Object)"message", (Object)LegacySQLTypeName.STRING, (Object)StringType$.MODULE$), new Tuple3((Object)"step", (Object)LegacySQLTypeName.STRING, (Object)StringType$.MODULE$)}));
    }
}

