/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.delta.commands;

import java.io.Serializable;
import java.util.List;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.hadoop.fs.Path;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.JsonToStructs;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.DeltaLog$;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.delta.MLSQLMultiDeltaOptions$;
import org.apache.spark.sql.delta.TableMetaInfo;
import org.apache.spark.sql.delta.actions.Metadata;
import org.apache.spark.sql.delta.commands.DeltaCommandsFun;
import org.apache.spark.sql.delta.commands.UpsertTableInDelta;
import org.apache.spark.sql.delta.commands.UpsertTableInDelta$;
import org.apache.spark.sql.delta.files.DelayedCommitProtocol;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataType$;
import org.apache.spark.sql.types.StructType;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.util.Try$;
import tech.mlsql.common.utils.Md5$;

public final class BinlogSyncToDelta$
implements DeltaCommandsFun {
    public static BinlogSyncToDelta$ MODULE$;

    static {
        new BinlogSyncToDelta$();
    }

    @Override
    public Tuple2<QueryExecution, Seq<Attribute>> normalizeData(Metadata metadata, Dataset<?> data, Seq<String> partitionCols) {
        return DeltaCommandsFun.normalizeData$(this, metadata, data, partitionCols);
    }

    @Override
    public DelayedCommitProtocol getCommitter(Path outputPath) {
        return DeltaCommandsFun.getCommitter$(this, outputPath);
    }

    @Override
    public Dataset<?> convertStreamDataFrame(Dataset<?> _data) {
        return DeltaCommandsFun.convertStreamDataFrame$(this, _data);
    }

    public void run(Dataset<Row> _ds, Map<String, String> options) {
        Seq idCols = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((String)options.apply((Object)UpsertTableInDelta$.MODULE$.ID_COLS())).split(","))).toSeq();
        int newDataParallelNum = new StringOps(Predef$.MODULE$.augmentString((String)options.getOrElse((Object)UpsertTableInDelta$.MODULE$.NEW_DATA_PARALLEL_NUM(), (Function0 & Serializable & scala.Serializable)() -> "8"))).toInt();
        ObjectRef ds = ObjectRef.create(this.convertStreamDataFrame(_ds));
        if (newDataParallelNum != new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((Dataset)ds.elem).rdd().partitions())).size()) {
            ds.elem = ((Dataset)ds.elem).repartition(newDataParallelNum);
        }
        ((Dataset)ds.elem).cache();
        try {
            Long l;
            if (new StringOps(Predef$.MODULE$.augmentString((String)options.getOrElse((Object)MLSQLMultiDeltaOptions$.MODULE$.KEEP_BINLOG(), (Function0 & Serializable & scala.Serializable)() -> "false"))).toBoolean()) {
                String originalLogPath = (String)options.apply((Object)MLSQLMultiDeltaOptions$.MODULE$.BINLOG_PATH());
                ((Dataset)ds.elem).write().format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").mode(SaveMode.Append).save(originalLogPath);
                l = BoxedUnit.UNIT;
            } else {
                l = BoxesRunTime.boxToLong((long)((Dataset)ds.elem).count());
            }
            SparkSession spark = ((Dataset)ds.elem).sparkSession();
            RDD dataSet = ((Dataset)ds.elem).rdd().flatMap((Function1 & Serializable & scala.Serializable)row -> {
                String value = row.getString(0);
                JSONObject wow = JSONObject.fromObject((Object)value);
                Object rows = wow.remove("rows");
                return (Buffer)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter((List)((JSONArray)rows)).asScala()).map((Function1 & Serializable & scala.Serializable)record -> {
                    ((JSONObject)record).put((Object)MLSQLMultiDeltaOptions$.MODULE$.META_KEY(), (Object)wow);
                    return (JSONObject)record;
                }, Buffer$.MODULE$.canBuildFrom());
            }, ClassTag$.MODULE$.apply(JSONObject.class));
            RDD finalDataSet = dataSet.map((Function1 & Serializable & scala.Serializable)record -> {
                String idColKey = ((TraversableOnce)idCols.map((Function1 & Serializable & scala.Serializable)idCol -> record.get(idCol).toString(), Seq$.MODULE$.canBuildFrom())).mkString("");
                String key = Md5$.MODULE$.md5Hash(new StringBuilder(2).append(BinlogSyncToDelta$.getDatabaseNameFromMeta$1(record)).append("_").append(BinlogSyncToDelta$.getTableNameNameFromMeta$1(record)).append("_").append(idColKey).toString());
                return new Tuple2((Object)key, (Object)record.toString());
            }, ClassTag$.MODULE$.apply(Tuple2.class)).groupBy((Function1 & Serializable & scala.Serializable)x$1 -> (String)x$1._1(), ClassTag$.MODULE$.apply(String.class)).map((Function1 & Serializable & scala.Serializable)f -> (Iterable)((TraversableLike)f._2()).map((Function1 & Serializable & scala.Serializable)m -> JSONObject.fromObject((Object)m._2()), scala.collection.Iterable$.MODULE$.canBuildFrom()), ClassTag$.MODULE$.apply(Iterable.class)).map((Function1 & Serializable & scala.Serializable)records -> {
                Seq items = (Seq)records.toSeq().sortBy((Function1 & Serializable & scala.Serializable)record -> BoxesRunTime.boxToLong((long)BinlogSyncToDelta$.$anonfun$run$11(record)), (Ordering)Ordering.Long$.MODULE$);
                return (JSONObject)items.last();
            }, ClassTag$.MODULE$.apply(JSONObject.class));
            Map tableToId = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])finalDataSet.map((Function1 & Serializable & scala.Serializable)record -> new TableMetaInfo(BinlogSyncToDelta$.getDatabaseNameFromMeta$1(record), BinlogSyncToDelta$.getTableNameNameFromMeta$1(record), BinlogSyncToDelta$.getschemaNameFromMeta$1(record)), ClassTag$.MODULE$.apply(TableMetaInfo.class)).distinct().collect())).zipWithIndex(Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toMap(Predef$.MODULE$.$conforms());
            RDD upsertRDD = finalDataSet.filter((Function1 & Serializable & scala.Serializable)record -> BoxesRunTime.boxToBoolean((boolean)BinlogSyncToDelta$.$anonfun$run$17(record)));
            Object object = upsertRDD.count() > 0L ? BinlogSyncToDelta$.saveToSink$1(upsertRDD, UpsertTableInDelta$.MODULE$.OPERATION_TYPE_UPSERT(), tableToId, options, spark, ds, idCols) : BoxedUnit.UNIT;
            RDD deleteRDD = finalDataSet.filter((Function1 & Serializable & scala.Serializable)record -> BoxesRunTime.boxToBoolean((boolean)BinlogSyncToDelta$.$anonfun$run$18(record)));
            if (deleteRDD.count() > 0L) {
                BinlogSyncToDelta$.saveToSink$1(deleteRDD, UpsertTableInDelta$.MODULE$.OPERATION_TYPE_DELETE(), tableToId, options, spark, ds, idCols);
            }
        }
        finally {
            ((Dataset)ds.elem).unpersist();
        }
    }

    private static final String _getInfoFromMeta$1(JSONObject record, String key) {
        return record.getJSONObject(MLSQLMultiDeltaOptions$.MODULE$.META_KEY()).getString(key);
    }

    private static final String getDatabaseNameFromMeta$1(JSONObject record) {
        return BinlogSyncToDelta$._getInfoFromMeta$1(record, "databaseName");
    }

    private static final String getTableNameNameFromMeta$1(JSONObject record) {
        return BinlogSyncToDelta$._getInfoFromMeta$1(record, "tableName");
    }

    private static final String getschemaNameFromMeta$1(JSONObject record) {
        return BinlogSyncToDelta$._getInfoFromMeta$1(record, "schema");
    }

    public static final /* synthetic */ long $anonfun$run$11(JSONObject record) {
        return record.getJSONObject(MLSQLMultiDeltaOptions$.MODULE$.META_KEY()).getLong("timestamp");
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static final /* synthetic */ boolean $anonfun$run$14(TableMetaInfo table$1, JSONObject record) {
        String string = BinlogSyncToDelta$.getDatabaseNameFromMeta$1(record);
        String string2 = table$1.db();
        if (string == null) {
            if (string2 != null) {
                return false;
            }
        } else if (!string.equals(string2)) return false;
        String string3 = BinlogSyncToDelta$.getTableNameNameFromMeta$1(record);
        String string4 = table$1.table();
        if (string3 == null) {
            if (string4 == null) return true;
            return false;
        } else {
            if (!string3.equals(string4)) return false;
            return true;
        }
    }

    private static final StructType deserializeSchema$1(String json) {
        StructType structType;
        DataType dataType = (DataType)Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> DataType$.MODULE$.fromJson(json)).get();
        if (!(dataType instanceof StructType)) {
            throw new RuntimeException(new StringBuilder(27).append("Failed parsing StructType: ").append(json).toString());
        }
        StructType structType2 = structType = (StructType)dataType;
        return structType2;
    }

    private static final scala.collection.immutable.Iterable saveToSink$1(RDD targetRDD, String operate, Map tableToId$1, Map options$1, SparkSession spark$1, ObjectRef ds$1, Seq idCols$1) {
        return (scala.collection.immutable.Iterable)tableToId$1.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            DeltaLog deltaLog;
            Dataset deleteDF;
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                boolean isInitial;
                TableMetaInfo table = (TableMetaInfo)tuple2._1();
                RDD tempRDD = targetRDD.filter((Function1 & Serializable & scala.Serializable)record -> BoxesRunTime.boxToBoolean((boolean)BinlogSyncToDelta$.$anonfun$run$14(table, record))).map((Function1 & Serializable & scala.Serializable)record -> {
                    record.remove(MLSQLMultiDeltaOptions$.MODULE$.META_KEY());
                    return record.toString();
                }, ClassTag$.MODULE$.apply(String.class));
                StructType sourceSchema = BinlogSyncToDelta$.deserializeSchema$1(table.schema());
                Column newColumnFromJsonStr = new Column((Expression)new JsonToStructs((DataType)sourceSchema, options$1, functions$.MODULE$.col("value").expr(), (Option)None$.MODULE$));
                deleteDF = spark$1.createDataset(tempRDD, spark$1.implicits().newStringEncoder()).toDF((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"value"})).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{newColumnFromJsonStr.as("data")})).select("data.*", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0]));
                String path = (String)options$1.apply((Object)MLSQLMultiDeltaOptions$.MODULE$.FULL_PATH_KEY());
                String tablePath = path.replace("{db}", table.db()).replace("{table}", table.table());
                deltaLog = DeltaLog$.MODULE$.forTable(spark$1, tablePath);
                long readVersion = deltaLog.snapshot().version();
                boolean bl = isInitial = readVersion < 0L;
                if (isInitial) {
                    throw new RuntimeException(new StringBuilder(17).append(tablePath).append(" is not initialed").toString());
                }
            } else {
                throw new MatchError((Object)tuple2);
            }
            UpsertTableInDelta upsertTableInDelta = new UpsertTableInDelta(deleteDF, (Option<SaveMode>)new Some((Object)SaveMode.Append), (Option<OutputMode>)None$.MODULE$, deltaLog, new DeltaOptions((Map)Predef$.MODULE$.Map().apply((Seq)Nil$.MODULE$), ((Dataset)ds$1.elem).sparkSession().sessionState().conf()), (Seq<String>)((Seq)Seq$.MODULE$.apply((Seq)Nil$.MODULE$)), (Map<String, String>)options$1.$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)UpsertTableInDelta$.MODULE$.OPERATION_TYPE()), (Object)operate), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)UpsertTableInDelta$.MODULE$.ID_COLS()), (Object)idCols$1.mkString(","))}))));
            Seq<Row> seq = upsertTableInDelta.run(spark$1);
            return seq;
        }, Iterable$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ boolean $anonfun$run$17(JSONObject record) {
        JSONObject meta = record.getJSONObject(MLSQLMultiDeltaOptions$.MODULE$.META_KEY());
        String string = meta.getString("type");
        String string2 = "delete";
        return string == null ? string2 != null : !string.equals(string2);
    }

    public static final /* synthetic */ boolean $anonfun$run$18(JSONObject record) {
        JSONObject meta = record.getJSONObject(MLSQLMultiDeltaOptions$.MODULE$.META_KEY());
        String string = meta.getString("type");
        String string2 = "delete";
        return !(string != null ? !string.equals(string2) : string2 != null);
    }

    private BinlogSyncToDelta$() {
        MODULE$ = this;
        DeltaCommandsFun.$init$(this);
    }
}

