/*
 * Decompiled with CFR 0.152.
 */
package ai.starlake.job.sink.jdbc;

import ai.starlake.config.Settings;
import ai.starlake.config.SparkEnv;
import ai.starlake.extract.JdbcDbUtils$;
import ai.starlake.job.sink.jdbc.JdbcConnectionLoadConfig;
import ai.starlake.utils.IngestionCounters;
import ai.starlake.utils.JobBase;
import ai.starlake.utils.JobResult;
import ai.starlake.utils.SparkJob;
import ai.starlake.utils.SparkJobResult;
import ai.starlake.utils.SparkUtils$;
import ai.starlake.utils.Utils$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import java.sql.Connection;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap;
import org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.types.StructType;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001da\u0001B\u0007\u000f\u0001eA\u0001B\n\u0001\u0003\u0002\u0003\u0006Ia\n\u0005\tW\u0001\u0011)\u0019!C\u0002Y!A1\u0007\u0001B\u0001B\u0003%Q\u0006C\u00035\u0001\u0011\u0005Q\u0007C\u0003;\u0001\u0011\u00053\bC\u0004H\u0001\t\u0007I\u0011\u0001%\t\rQ\u0003\u0001\u0015!\u0003J\u0011\u001d)\u0006A1A\u0005\u0002YCa!\u001c\u0001!\u0002\u00139\u0006\"\u00028\u0001\t\u0013y\u0007\"B:\u0001\t\u0003!\b\"B?\u0001\t\u0003r(aD*qCJ\\'\n\u001a2d/JLG/\u001a:\u000b\u0005=\u0001\u0012\u0001\u00026eE\u000eT!!\u0005\n\u0002\tMLgn\u001b\u0006\u0003'Q\t1A[8c\u0015\t)b#\u0001\u0005ti\u0006\u0014H.Y6f\u0015\u00059\u0012AA1j\u0007\u0001\u00192\u0001\u0001\u000e!!\tYb$D\u0001\u001d\u0015\u0005i\u0012!B:dC2\f\u0017BA\u0010\u001d\u0005\u0019\te.\u001f*fMB\u0011\u0011\u0005J\u0007\u0002E)\u00111\u0005F\u0001\u0006kRLGn]\u0005\u0003K\t\u0012\u0001b\u00159be.TuNY\u0001\nG2L7i\u001c8gS\u001e\u0004\"\u0001K\u0015\u000e\u00039I!A\u000b\b\u00031)#'mY\"p]:,7\r^5p]2{\u0017\rZ\"p]\u001aLw-\u0001\u0005tKR$\u0018N\\4t+\u0005i\u0003C\u0001\u00182\u001b\u0005y#B\u0001\u0019\u0015\u0003\u0019\u0019wN\u001c4jO&\u0011!g\f\u0002\t'\u0016$H/\u001b8hg\u0006I1/\u001a;uS:<7\u000fI\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005YJDCA\u001c9!\tA\u0003\u0001C\u0003,\t\u0001\u000fQ\u0006C\u0003'\t\u0001\u0007q%\u0001\u0003oC6,W#\u0001\u001f\u0011\u0005u\"eB\u0001 C!\tyD$D\u0001A\u0015\t\t\u0005$\u0001\u0004=e>|GOP\u0005\u0003\u0007r\ta\u0001\u0015:fI\u00164\u0017BA#G\u0005\u0019\u0019FO]5oO*\u00111\tH\u0001\u0005G>tg-F\u0001J!\tQ%+D\u0001L\u0015\t9EJ\u0003\u0002N\u001d\u00061\u0001.\u00193p_BT!a\u0014)\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\t\u0016aA8sO&\u00111k\u0013\u0002\u000e\u0007>tg-[4ve\u0006$\u0018n\u001c8\u0002\u000b\r|gN\u001a\u0011\u0002\u0017)$'mY(qi&|gn]\u000b\u0002/B\u0019\u0001,Y2\u000e\u0003eS!AW.\u0002\tU$\u0018\u000e\u001c\u0006\u00039v\u000b\u0001bY1uC2L8\u000f\u001e\u0006\u0003=~\u000b1a]9m\u0015\t\u0001g*A\u0003ta\u0006\u00148.\u0003\u0002c3\n\u00112)Y:f\u0013:\u001cXM\\:ji&4X-T1q!\t!'N\u0004\u0002fQ6\taM\u0003\u0002h)\u00059Q\r\u001f;sC\u000e$\u0018BA5g\u0003-QEMY2EEV#\u0018\u000e\\:\n\u0005-d'!\u0003+bE2,g*Y7f\u0015\tIg-\u0001\u0007kI\n\u001cw\n\u001d;j_:\u001c\b%\u0001\u0004jg\u001aKG.\u001a\u000b\u0002aB\u00111$]\u0005\u0003er\u0011qAQ8pY\u0016\fg.A\u0004sk:TEIQ\"\u0015\u0003U\u00042A\u001e={\u001b\u00059(B\u0001.\u001d\u0013\tIxOA\u0002Uef\u0004\"!I>\n\u0005q\u0014#AD*qCJ\\'j\u001c2SKN,H\u000e^\u0001\u0004eVtG#A@\u0011\tYD\u0018\u0011\u0001\t\u0004C\u0005\r\u0011bAA\u0003E\tI!j\u001c2SKN,H\u000e\u001e")
public class SparkJdbcWriter
implements SparkJob {
    private final JdbcConnectionLoadConfig cliConfig;
    private final Settings settings;
    private final Configuration conf;
    private final CaseInsensitiveMap<String> jdbcOptions;
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
    private SparkSession session;
    private final String appName;
    private final Logger logger;
    private volatile byte bitmap$0;

    @Override
    public SparkConf withExtraSparkConf(SparkConf sourceConfig) {
        return SparkJob.withExtraSparkConf$(this, sourceConfig);
    }

    @Override
    public String getTableLocation(String domain, String schema) {
        return SparkJob.getTableLocation$(this, domain, schema);
    }

    @Override
    public String getTableLocation(String fullTableName) {
        return SparkJob.getTableLocation$(this, fullTableName);
    }

    @Override
    public void registerUdf(String udf) {
        SparkJob.registerUdf$(this, udf);
    }

    @Override
    public String applicationId() {
        return JobBase.applicationId$(this);
    }

    @Override
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> ds) {
        return DatasetLogging.DatasetHelper$(this, ds);
    }

    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() {
        SparkJdbcWriter sparkJdbcWriter = this;
        synchronized (sparkJdbcWriter) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.ai$starlake$utils$SparkJob$$sparkEnv = SparkJob.ai$starlake$utils$SparkJob$$sparkEnv$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    @Override
    public SparkEnv ai$starlake$utils$SparkJob$$sparkEnv() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.ai$starlake$utils$SparkJob$$sparkEnv$lzycompute();
        }
        return this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    private SparkSession session$lzycompute() {
        SparkJdbcWriter sparkJdbcWriter = this;
        synchronized (sparkJdbcWriter) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.session = SparkJob.session$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.session;
    }

    @Override
    public SparkSession session() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.session$lzycompute();
        }
        return this.session;
    }

    @Override
    public String appName() {
        return this.appName;
    }

    @Override
    public void ai$starlake$utils$JobBase$_setter_$appName_$eq(String x$1) {
        this.appName = x$1;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    @Override
    public Settings settings() {
        return this.settings;
    }

    @Override
    public String name() {
        return new StringBuilder(13).append("cnxload-JDBC-").append(this.cliConfig.outputDomainAndTableName()).toString();
    }

    public Configuration conf() {
        return this.conf;
    }

    public CaseInsensitiveMap<String> jdbcOptions() {
        return this.jdbcOptions;
    }

    private boolean isFile() {
        return this.cliConfig.sourceFile().isLeft();
    }

    public Try<SparkJobResult> runJDBC() {
        BoxedUnit boxedUnit;
        Either<String, Dataset<Row>> inputPath = this.cliConfig.sourceFile();
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info("Input path {}", new Object[]{inputPath});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> {
            BoxedUnit boxedUnit;
            Dataset dataset;
            Either either = inputPath;
            if (either instanceof Left) {
                Left left = (Left)either;
                String path = (String)left.value();
                dataset = this.session().read().format(this.settings().appConfig().defaultWriteFormat()).load(path);
            } else if (either instanceof Right) {
                Dataset df;
                Right right = (Right)either;
                dataset = df = (Dataset)right.value();
            } else {
                throw new MatchError((Object)either);
            }
            Dataset sourceDF = dataset;
            String outputDomain = $this.cliConfig.outputDomainAndTableName().split("\\.")[0];
            String url = (String)this.jdbcOptions().apply((Object)"url");
            JdbcDbUtils$.MODULE$.withJDBCConnection((Map<String, String>)this.jdbcOptions(), (Function1 & Serializable & scala.Serializable)conn -> {
                SparkJdbcWriter.$anonfun$runJDBC$2(this, url, outputDomain, sourceDF, conn);
                return BoxedUnit.UNIT;
            }, this.settings());
            boolean isDuckDb = url.contains("jdbc:duckdb");
            String format = isDuckDb ? "starlake-duckdb" : "jdbc";
            DataFrameWriter dfw = sourceDF.write().format(format).option("dbtable", $this.cliConfig.outputDomainAndTableName());
            JdbcDialect dialect = SparkUtils$.MODULE$.dialect(url);
            JdbcDbUtils$.MODULE$.withJDBCConnection((Map<String, String>)this.jdbcOptions(), (Function1 & Serializable & scala.Serializable)conn -> {
                JdbcDbUtils$.MODULE$.truncateTable(conn, this.cliConfig.outputDomainAndTableName());
                return BoxedUnit.UNIT;
            }, this.settings());
            DataFrameWriter dfToSave = dfw.mode(SaveMode.Append).options($this.cliConfig.options());
            if (isDuckDb) {
                dfToSave.option("numPartitions", "1").save();
            } else {
                dfToSave.save();
            }
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("JDBC save done to table {} at {}", new Object[]{$this.cliConfig.outputDomainAndTableName(), $this.cliConfig.options()});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            return new SparkJobResult((Option<Dataset<Row>>)None$.MODULE$, (Option<IngestionCounters>)None$.MODULE$);
        });
    }

    @Override
    public Try<JobResult> run() {
        Try<SparkJobResult> res = this.runJDBC();
        return Utils$.MODULE$.logFailure(res, this.logger());
    }

    public static final /* synthetic */ void $anonfun$runJDBC$2(SparkJdbcWriter $this, String url$1, String outputDomain$1, Dataset sourceDF$1, Connection conn) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        boolean tableExists = JdbcDbUtils$.MODULE$.tableExists(conn, url$1, $this.cliConfig.outputDomainAndTableName());
        if (!tableExists && $this.settings().appConfig().createSchemaIfNotExists()) {
            BoxedUnit boxedUnit3;
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("table {} not found, trying to create it", new Object[]{$this.cliConfig.outputDomainAndTableName()});
                boxedUnit3 = BoxedUnit.UNIT;
            } else {
                boxedUnit3 = BoxedUnit.UNIT;
            }
            JdbcDbUtils$.MODULE$.createSchema(conn, outputDomain$1);
            boxedUnit2 = BoxedUnit.UNIT;
        } else if ($this.logger().underlying().isInfoEnabled()) {
            $this.logger().underlying().info("Schema {} found", new Object[]{outputDomain$1});
            boxedUnit2 = BoxedUnit.UNIT;
        } else {
            boxedUnit2 = BoxedUnit.UNIT;
        }
        StructType schema = sourceDF$1.schema();
        if (SparkUtils$.MODULE$.isFlat(schema) && tableExists) {
            BoxedUnit boxedUnit4;
            BoxedUnit boxedUnit5;
            Option<StructType> existingSchema = SparkUtils$.MODULE$.getSchemaOption(conn, (Map<String, String>)$this.jdbcOptions(), $this.cliConfig.outputDomainAndTableName());
            StructType addedSchema = SparkUtils$.MODULE$.added(schema, (StructType)existingSchema.getOrElse((Function0 & Serializable & scala.Serializable)() -> schema));
            StructType deletedSchema = SparkUtils$.MODULE$.dropped(schema, (StructType)existingSchema.getOrElse((Function0 & Serializable & scala.Serializable)() -> schema));
            Seq<String> alterTableDropColumns = SparkUtils$.MODULE$.alterTableDropColumnsString(deletedSchema, $this.cliConfig.outputDomainAndTableName());
            if (alterTableDropColumns.nonEmpty()) {
                BoxedUnit boxedUnit6;
                if ($this.logger().underlying().isInfoEnabled()) {
                    $this.logger().underlying().info("alter table {} with {} columns to drop", new Object[]{$this.cliConfig.outputDomainAndTableName(), BoxesRunTime.boxToInteger((int)alterTableDropColumns.size())});
                    boxedUnit6 = BoxedUnit.UNIT;
                } else {
                    boxedUnit6 = BoxedUnit.UNIT;
                }
                if ($this.logger().underlying().isDebugEnabled()) {
                    $this.logger().underlying().debug("alter table {}", new Object[]{alterTableDropColumns.mkString("\n")});
                    boxedUnit5 = BoxedUnit.UNIT;
                } else {
                    boxedUnit5 = BoxedUnit.UNIT;
                }
            } else {
                boxedUnit5 = BoxedUnit.UNIT;
            }
            Seq<String> alterTableAddColumns = SparkUtils$.MODULE$.alterTableAddColumnsString(addedSchema, $this.cliConfig.outputDomainAndTableName(), (Map<String, String>)Predef$.MODULE$.Map().empty());
            if (alterTableAddColumns.nonEmpty()) {
                BoxedUnit boxedUnit7;
                if ($this.logger().underlying().isInfoEnabled()) {
                    $this.logger().underlying().info("alter table {} with {} columns to add", new Object[]{$this.cliConfig.outputDomainAndTableName(), BoxesRunTime.boxToInteger((int)alterTableAddColumns.size())});
                    boxedUnit7 = BoxedUnit.UNIT;
                } else {
                    boxedUnit7 = BoxedUnit.UNIT;
                }
                if ($this.logger().underlying().isDebugEnabled()) {
                    $this.logger().underlying().debug("alter table {}", new Object[]{alterTableAddColumns.mkString("\n")});
                    boxedUnit4 = BoxedUnit.UNIT;
                } else {
                    boxedUnit4 = BoxedUnit.UNIT;
                }
            } else {
                boxedUnit4 = BoxedUnit.UNIT;
            }
            alterTableDropColumns.foreach((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)JdbcDbUtils$.MODULE$.executeAlterTable(x$1, conn)));
            alterTableAddColumns.foreach((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)JdbcDbUtils$.MODULE$.executeAlterTable(x$2, conn)));
            return;
        }
        JdbcOptionsInWrite optionsWrite = new JdbcOptionsInWrite(url$1, $this.cliConfig.outputDomainAndTableName(), $this.jdbcOptions());
        if ($this.logger().underlying().isInfoEnabled()) {
            $this.logger().underlying().info("Table {} not found, creating it with schema {}", new Object[]{$this.cliConfig.outputDomainAndTableName(), schema});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        SparkUtils$.MODULE$.createTable(conn, $this.cliConfig.outputDomainAndTableName(), schema, false, optionsWrite, (Map<String, Map<String, String>>)Predef$.MODULE$.Map().empty(), $this.settings());
    }

    public SparkJdbcWriter(JdbcConnectionLoadConfig cliConfig, Settings settings) {
        BoxedUnit boxedUnit;
        this.cliConfig = cliConfig;
        this.settings = settings;
        StrictLogging.$init$((StrictLogging)this);
        DatasetLogging.$init$(this);
        JobBase.$init$(this);
        SparkJob.$init$(this);
        this.conf = this.session().sparkContext().hadoopConfiguration();
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info("JDBC Config {}", new Object[]{cliConfig});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        this.jdbcOptions = JdbcDbUtils$.MODULE$.jdbcOptions(cliConfig.options(), cliConfig.format());
    }
}

