/*
 * Decompiled with CFR 0.152.
 */
package ai.chronon.spark;

import ai.chronon.api.Constants$;
import ai.chronon.api.DataType;
import ai.chronon.api.Extensions$;
import ai.chronon.api.Join;
import ai.chronon.api.StringType$;
import ai.chronon.api.StructField;
import ai.chronon.online.JoinCodec;
import ai.chronon.online.JoinCodec$;
import ai.chronon.online.Metrics;
import ai.chronon.spark.Conversions$;
import ai.chronon.spark.LogFlattenerJob$;
import ai.chronon.spark.PartitionRange;
import ai.chronon.spark.TableUtils;
import java.io.Serializable;
import java.util.Base64;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.StructType;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple5;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.generic.CanBuildFrom;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Iterable$;
import scala.collection.mutable.LinkedHashMap;
import scala.collection.mutable.LinkedHashMap$;
import scala.collection.parallel.ParIterable$;
import scala.collection.parallel.ParIterableLike;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.ScalaVersionSpecificCollectionsConverter$;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0001\u0005}e\u0001B\r\u001b\u0001\u0005B\u0001b\u000b\u0001\u0003\u0002\u0003\u0006I\u0001\f\u0005\to\u0001\u0011\t\u0011)A\u0005q!Aa\b\u0001B\u0001B\u0003%q\b\u0003\u0005K\u0001\t\u0005\t\u0015!\u0003@\u0011!Y\u0005A!A!\u0002\u0013y\u0004\"\u0002'\u0001\t\u0003i\u0005bB+\u0001\u0005\u0004%\tA\u0016\u0005\u00075\u0002\u0001\u000b\u0011B,\t\u000fm\u0003!\u0019!C\u00019\"1\u0001\r\u0001Q\u0001\nuCq!\u0019\u0001C\u0002\u0013\u0005!\r\u0003\u0004n\u0001\u0001\u0006Ia\u0019\u0005\u0006]\u0002!Ia\u001c\u0005\u0006u\u0002!Ia\u001f\u0005\b\u0003+\u0001A\u0011BA\f\u0011\u001d\t\t\u0006\u0001C\u0005\u0003'Bq!a\u0018\u0001\t\u0003\t\t\u0007C\u0004\u0002h\u0001!I!!\u001b\t\u000f\u0005E\u0004\u0001\"\u0001\u0002t\u001d9\u00111\u0010\u000e\t\u0002\u0005udAB\r\u001b\u0011\u0003\ty\b\u0003\u0004M+\u0011\u0005\u0011\u0011\u0011\u0005\b\u0003\u0007+B\u0011AAC\u0011%\tY)FA\u0001\n\u0013\tiIA\bM_\u001e4E.\u0019;uK:,'OS8c\u0015\tYB$A\u0003ta\u0006\u00148N\u0003\u0002\u001e=\u000591\r\u001b:p]>t'\"A\u0010\u0002\u0005\u0005L7\u0001A\n\u0004\u0001\tB\u0003CA\u0012'\u001b\u0005!#\"A\u0013\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u001d\"#AB!osJ+g\r\u0005\u0002$S%\u0011!\u0006\n\u0002\r'\u0016\u0014\u0018.\u00197ju\u0006\u0014G.Z\u0001\bg\u0016\u001c8/[8o!\tiS'D\u0001/\u0015\ty\u0003'A\u0002tc2T!aG\u0019\u000b\u0005I\u001a\u0014AB1qC\u000eDWMC\u00015\u0003\ry'oZ\u0005\u0003m9\u0012Ab\u00159be.\u001cVm]:j_:\f\u0001B[8j]\u000e{gN\u001a\t\u0003sqj\u0011A\u000f\u0006\u0003wq\t1!\u00199j\u0013\ti$H\u0001\u0003K_&t\u0017aB3oI\u0012\u000bG/\u001a\t\u0003\u0001\u001es!!Q#\u0011\u0005\t#S\"A\"\u000b\u0005\u0011\u0003\u0013A\u0002\u001fs_>$h(\u0003\u0002GI\u00051\u0001K]3eK\u001aL!\u0001S%\u0003\rM#(/\u001b8h\u0015\t1E%\u0001\u0005m_\u001e$\u0016M\u00197f\u0003-\u00198\r[3nCR\u000b'\r\\3\u0002\rqJg.\u001b;?)\u0019q\u0005+\u0015*T)B\u0011q\nA\u0007\u00025!)1F\u0002a\u0001Y!)qG\u0002a\u0001q!)aH\u0002a\u0001\u007f!)!J\u0002a\u0001\u007f!)1J\u0002a\u0001\u007f\u0005QA/\u00192mKV#\u0018\u000e\\:\u0016\u0003]\u0003\"a\u0014-\n\u0005eS\"A\u0003+bE2,W\u000b^5mg\u0006YA/\u00192mKV#\u0018\u000e\\:!\u00031Qw.\u001b8UE2\u0004&o\u001c9t+\u0005i\u0006\u0003\u0002!_\u007f}J!aX%\u0003\u00075\u000b\u0007/A\u0007k_&tGK\u00197Qe>\u00048\u000fI\u0001\b[\u0016$(/[2t+\u0005\u0019\u0007C\u00013k\u001d\t)\u0007.D\u0001g\u0015\t9G$\u0001\u0004p]2Lg.Z\u0005\u0003S\u001a\fq!T3ue&\u001c7/\u0003\u0002lY\n91i\u001c8uKb$(BA5g\u0003!iW\r\u001e:jGN\u0004\u0013\u0001E4fiVsg-\u001b7mK\u0012\u0014\u0016M\\4f)\r\u0001h\u000f\u001f\t\u0004GE\u001c\u0018B\u0001:%\u0005\u0019y\u0005\u000f^5p]B\u0011q\n^\u0005\u0003kj\u0011a\u0002U1si&$\u0018n\u001c8SC:<W\rC\u0003x\u001b\u0001\u0007q(\u0001\u0006j]B,H\u000fV1cY\u0016DQ!_\u0007A\u0002}\n1b\\;uaV$H+\u00192mK\u0006aA-\u001a3va\u00164\u0015.\u001a7egR\u0019A0!\u0005\u0011\u000bu\f)!a\u0003\u000f\u0007y\f\tA\u0004\u0002C\u007f&\tQ%C\u0002\u0002\u0004\u0011\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002\b\u0005%!\u0001C%uKJ\f'\r\\3\u000b\u0007\u0005\rA\u0005E\u0002:\u0003\u001bI1!a\u0004;\u0005-\u0019FO];di\u001aKW\r\u001c3\t\r\u0005Ma\u00021\u0001}\u0003\u00191\u0017.\u001a7eg\u0006!b\r\\1ui\u0016t7*Z=WC2,XMQ=uKN$b!!\u0007\u00026\u0005\u0015\u0003\u0003BA\u000e\u0003_qA!!\b\u0002.9!\u0011qDA\u0016\u001d\u0011\t\t#!\u000b\u000f\t\u0005\r\u0012q\u0005\b\u0004\u0005\u0006\u0015\u0012\"\u0001\u001b\n\u0005I\u001a\u0014BA\u000e2\u0013\ty\u0003'C\u0002\u0002\u00049JA!!\r\u00024\tIA)\u0019;b\rJ\fW.\u001a\u0006\u0004\u0003\u0007q\u0003bBA\u001c\u001f\u0001\u0007\u0011\u0011H\u0001\u0006e\u0006<HI\u001a\t\u0006[\u0005m\u0012qH\u0005\u0004\u0003{q#a\u0002#bi\u0006\u001cX\r\u001e\t\u0004[\u0005\u0005\u0013bAA\"]\t\u0019!k\\<\t\u000f\u0005\u001ds\u00021\u0001\u0002J\u0005A1m\u001c3fG6\u000b\u0007\u000fE\u0003A=~\nY\u0005E\u0002f\u0003\u001bJ1!a\u0014g\u0005%Qu.\u001b8D_\u0012,7-\u0001\u0007gKR\u001c\u0007nU2iK6\f7\u000fF\u0002^\u0003+Bq!a\u0016\u0011\u0001\u0004\tI&\u0001\u0004iCNDWm\u001d\t\u0005{\u0006ms(\u0003\u0003\u0002^\u0005%!aA*fc\u0006!\"-^5mIR\u000b'\r\\3Qe>\u0004XM\u001d;jKN$2!XA2\u0011\u0019\t)'\u0005a\u0001;\u0006I1o\u00195f[\u0006l\u0015\r]\u0001\fG>dW/\u001c8D_VtG\u000f\u0006\u0002\u0002lA\u00191%!\u001c\n\u0007\u0005=DEA\u0002J]R\fQBY;jY\u0012dun\u001a+bE2,GCAA;!\r\u0019\u0013qO\u0005\u0004\u0003s\"#\u0001B+oSR\fq\u0002T8h\r2\fG\u000f^3oKJTuN\u0019\t\u0003\u001fV\u00192!\u0006\u0012))\t\ti(A\rsK\u0006$7k\u00195f[\u0006$\u0016M\u00197f!J|\u0007/\u001a:uS\u0016\u001cH#B/\u0002\b\u0006%\u0005\"B+\u0018\u0001\u00049\u0006\"B\u001c\u0018\u0001\u0004A\u0014a\u0003:fC\u0012\u0014Vm]8mm\u0016$\"!a$\u0011\t\u0005E\u00151T\u0007\u0003\u0003'SA!!&\u0002\u0018\u0006!A.\u00198h\u0015\t\tI*\u0001\u0003kCZ\f\u0017\u0002BAO\u0003'\u0013aa\u00142kK\u000e$\b")
public class LogFlattenerJob
implements scala.Serializable {
    private final SparkSession session;
    private final Join joinConf;
    private final String endDate;
    private final String logTable;
    private final String schemaTable;
    private final TableUtils tableUtils;
    private final Map<String, String> joinTblProps;
    private final Metrics.Context metrics;

    public static Map<String, String> readSchemaTableProperties(TableUtils tableUtils, Join join) {
        return LogFlattenerJob$.MODULE$.readSchemaTableProperties(tableUtils, join);
    }

    public TableUtils tableUtils() {
        return this.tableUtils;
    }

    public Map<String, String> joinTblProps() {
        return this.joinTblProps;
    }

    public Metrics.Context metrics() {
        return this.metrics;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private Option<PartitionRange> getUnfilledRange(String inputTable, String outputTable) {
        Failure failure;
        String partitionName = Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).nameToFilePath().replace("/", "%2F");
        Try unfilledRangeTry = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> this.tableUtils().unfilledRange(outputTable, new PartitionRange(null, $this.endDate), (Option<String>)new Some((Object)inputTable), (Map<String, String>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"name"), (Object)partitionName)})))));
        boolean bl = false;
        Success success = null;
        Try try_ = unfilledRangeTry;
        if (try_ instanceof Failure && (failure = (Failure)try_).exception() instanceof AssertionError) {
            Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(124).append("\n             |The join name ").append(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).nameToFilePath()).append(" does not have available logged data yet.\n             |Please double check your logging status").toString())).stripMargin());
            return None$.MODULE$;
        }
        if (try_ instanceof Success) {
            bl = true;
            success = (Success)try_;
            Option option = (Option)success.value();
            if (None$.MODULE$.equals(option)) {
                Predef$.MODULE$.println((Object)new StringBuilder(49).append(outputTable).append(" seems to be caught up - to either ").append(inputTable).append("(latest ").append(this.tableUtils().lastAvailablePartition(inputTable, this.tableUtils().lastAvailablePartition$default$2())).append(") or ").append(this.endDate).append(".").toString());
                return None$.MODULE$;
            }
        }
        if (!bl) throw new MatchError((Object)try_);
        Option option = (Option)success.value();
        if (!(option instanceof Some)) throw new MatchError((Object)try_);
        Some some = (Some)option;
        PartitionRange partitionRange = (PartitionRange)some.value();
        return new Some((Object)partitionRange);
    }

    private Iterable<StructField> dedupeFields(Iterable<StructField> fields) {
        LinkedHashMap fieldsBuilder = (LinkedHashMap)LinkedHashMap$.MODULE$.apply((Seq)Nil$.MODULE$);
        fields.foreach((Function1 & Serializable & scala.Serializable)f -> {
            Option option;
            if (fieldsBuilder.contains((Object)f.name())) {
                Object object = fieldsBuilder.apply((Object)f.name());
                DataType dataType = f.fieldType();
                if (object == null ? dataType != null : !object.equals(dataType)) {
                    throw new Exception(new StringBuilder(57).append("Found field with same name ").append(f.name()).append(" but different dataTypes: ").append(fieldsBuilder.apply((Object)f.name())).append(" vs ").append(f.fieldType()).toString());
                }
                option = BoxedUnit.UNIT;
            } else {
                option = fieldsBuilder.put((Object)f.name(), (Object)f.fieldType());
            }
            return option;
        });
        return (Iterable)fieldsBuilder.map((Function1 & Serializable & scala.Serializable)f -> new StructField((String)f._1(), (DataType)f._2()), Iterable$.MODULE$.canBuildFrom());
    }

    private Dataset<Row> flattenKeyValueBytes(Dataset<Row> rawDf, Map<String, JoinCodec> codecMap) {
        Iterable<StructField> dataFields = this.dedupeFields((Iterable<StructField>)((Iterable)((TraversableLike)codecMap.values().flatMap((Function1 & Serializable & scala.Serializable)x$1 -> new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x$1.keyFields())), scala.collection.Iterable$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce)codecMap.values().flatMap((Function1 & Serializable & scala.Serializable)x$2 -> new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x$2.valueFields())), scala.collection.Iterable$.MODULE$.canBuildFrom()), scala.collection.Iterable$.MODULE$.canBuildFrom())));
        StructField structField = new StructField(Constants$.MODULE$.SchemaHash(), (DataType)StringType$.MODULE$);
        StructField[] metadataFields = (StructField[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])JoinCodec$.MODULE$.timeFields())).$plus$colon((Object)structField, ClassTag$.MODULE$.apply(StructField.class));
        ai.chronon.api.StructType outputSchema = new ai.chronon.api.StructType("", (StructField[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])metadataFields)).$plus$plus(dataFields, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(StructField.class))));
        Tuple5 tuple5 = new Tuple5((Object)BoxesRunTime.boxToInteger((int)0), (Object)BoxesRunTime.boxToInteger((int)1), (Object)BoxesRunTime.boxToInteger((int)2), (Object)BoxesRunTime.boxToInteger((int)3), (Object)BoxesRunTime.boxToInteger((int)4));
        if (tuple5 == null) {
            throw new MatchError((Object)tuple5);
        }
        int keyBase64Idx = BoxesRunTime.unboxToInt((Object)tuple5._1());
        int valueBase64Idx = BoxesRunTime.unboxToInt((Object)tuple5._2());
        int tsIdx = BoxesRunTime.unboxToInt((Object)tuple5._3());
        int dsIdx = BoxesRunTime.unboxToInt((Object)tuple5._4());
        int schemaHashIdx = BoxesRunTime.unboxToInt((Object)tuple5._5());
        Tuple5 tuple52 = new Tuple5((Object)BoxesRunTime.boxToInteger((int)keyBase64Idx), (Object)BoxesRunTime.boxToInteger((int)valueBase64Idx), (Object)BoxesRunTime.boxToInteger((int)tsIdx), (Object)BoxesRunTime.boxToInteger((int)dsIdx), (Object)BoxesRunTime.boxToInteger((int)schemaHashIdx));
        Tuple5 tuple53 = tuple52;
        int keyBase64Idx2 = BoxesRunTime.unboxToInt((Object)tuple53._1());
        int valueBase64Idx2 = BoxesRunTime.unboxToInt((Object)tuple53._2());
        int tsIdx2 = BoxesRunTime.unboxToInt((Object)tuple53._3());
        int dsIdx2 = BoxesRunTime.unboxToInt((Object)tuple53._4());
        int schemaHashIdx2 = BoxesRunTime.unboxToInt((Object)tuple53._5());
        RDD outputRdd = rawDf.select("key_base64", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"value_base64", "ts_millis", "ds", Constants$.MODULE$.SchemaHash()})).rdd().flatMap((Function1 & Serializable & scala.Serializable)row -> {
            Iterable iterable;
            if (row.isNullAt(schemaHashIdx2)) {
                iterable = Option$.MODULE$.option2Iterable((Option)None$.MODULE$);
            } else {
                JoinCodec joinCodec = (JoinCodec)codecMap.apply((Object)row.getString(schemaHashIdx2));
                byte[] keyBytes = Base64.getDecoder().decode(row.getString(keyBase64Idx2));
                byte[] valueBytes = Base64.getDecoder().decode(row.getString(valueBase64Idx2));
                Try keyRow = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> joinCodec.keyCodec().decodeRow(keyBytes));
                Try valueRow = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> joinCodec.valueCodec().decodeRow(valueBytes));
                if (keyRow.isFailure() || valueRow.isFailure()) {
                    this.metrics().increment(Metrics.Name$.MODULE$.Exception());
                    iterable = Option$.MODULE$.option2Iterable((Option)None$.MODULE$);
                } else {
                    Object[] dataColumns = (Object[])((ParIterableLike)((ParIterableLike)dataFields.par()).map((Function1 & Serializable & scala.Serializable)field -> {
                        Option keyIdxOpt = joinCodec.keyIndices().get(field);
                        Option valIdxOpt = joinCodec.valueIndices().get(field);
                        return keyIdxOpt.isDefined() ? keyRow.toOption().map((Function1 & Serializable & scala.Serializable)x$5 -> x$5[BoxesRunTime.unboxToInt((Object)keyIdxOpt.get())]).orNull(Predef$.MODULE$.$conforms()) : (valIdxOpt.isDefined() ? valueRow.toOption().map((Function1 & Serializable & scala.Serializable)x$6 -> x$6[BoxesRunTime.unboxToInt((Object)valIdxOpt.get())]).orNull(Predef$.MODULE$.$conforms()) : null);
                    }, (CanBuildFrom)ParIterable$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.Any());
                    Object[] metadataColumns = (Object[])Array$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{row.get(schemaHashIdx2), row.get(tsIdx2), row.get(dsIdx2)}), ClassTag$.MODULE$.Any());
                    Object[] outputRow = (Object[])Predef$.MODULE$.genericArrayOps((Object)metadataColumns).$plus$plus((GenTraversableOnce)Predef$.MODULE$.genericArrayOps((Object)dataColumns), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Any()));
                    GenericRow unpackedRow = (GenericRow)Conversions$.MODULE$.toSparkRow(outputRow, (DataType)outputSchema, Conversions$.MODULE$.toSparkRow$default$3());
                    iterable = Option$.MODULE$.option2Iterable((Option)new Some((Object)unpackedRow));
                }
            }
            return iterable;
        }, ClassTag$.MODULE$.apply(Row.class));
        StructType outputSparkSchema = Conversions$.MODULE$.fromChrononSchema(outputSchema);
        return this.session.createDataFrame(outputRdd, outputSparkSchema);
    }

    private Map<String, String> fetchSchemas(Seq<String> hashes) {
        Option<String> schemaTableDs = this.tableUtils().lastAvailablePartition(this.schemaTable, this.tableUtils().lastAvailablePartition$default$2());
        if (schemaTableDs.isEmpty()) {
            throw new Exception(new StringBuilder(29).append(this.schemaTable).append(" has no partitions available!").toString());
        }
        return new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.session.table(this.schemaTable).where(functions$.MODULE$.col(Constants$.MODULE$.PartitionColumn()).$eq$eq$eq(schemaTableDs.get())).where(functions$.MODULE$.col(Constants$.MODULE$.SchemaHash()).isin(hashes)).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col(Constants$.MODULE$.SchemaHash()), functions$.MODULE$.col("schema_value_last").as("schema_value")})).collect())).map((Function1 & Serializable & scala.Serializable)row -> new Tuple2((Object)row.getString(0), (Object)row.getString(1)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toMap(Predef$.MODULE$.$conforms());
    }

    public Map<String, String> buildTableProperties(Map<String, String> schemaMap) {
        return (Map)LogFlattenerJob$.MODULE$.readSchemaTableProperties(this.tableUtils(), this.joinConf).$plus$plus(schemaMap).map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            String key = (String)tuple2._1();
            String value = (String)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)LogFlattenerJob.escape$1(new StringBuilder(1).append(Constants$.MODULE$.SchemaHash()).append("_").append(key).toString()), (Object)LogFlattenerJob.escape$1(value));
            return tuple22;
        }, Map$.MODULE$.canBuildFrom());
    }

    private int columnCount() {
        return BoxesRunTime.unboxToInt((Object)Try$.MODULE$.apply((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> this.tableUtils().getSchemaFromTable(Extensions$.MODULE$.MetadataOps($this.joinConf.metaData).loggedTable()).fields().length).toOption().getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 0));
    }

    public void buildLogTable() {
        if (!this.joinConf.metaData.isSetSamplePercent()) {
            Predef$.MODULE$.println((Object)new StringBuilder(34).append("samplePercent is unset for ").append(this.joinConf.metaData.name).append(". Exit.").toString());
            return;
        }
        Option<PartitionRange> unfilled = this.getUnfilledRange(this.logTable, Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).loggedTable());
        if (unfilled.isEmpty()) {
            return;
        }
        long start = System.currentTimeMillis();
        String joinName = Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).nameToFilePath();
        PartitionRange qual$1 = (PartitionRange)unfilled.get();
        Object x$1 = null;
        String x$2 = this.logTable;
        Map<String, String> x$3 = qual$1.genScanQuery$default$3();
        String rawTableScan = qual$1.genScanQuery(null, x$2, x$3);
        Dataset rawDf = this.tableUtils().sql(rawTableScan).where(functions$.MODULE$.col("name").$eq$eq$eq((Object)joinName));
        Predef$.MODULE$.println((Object)new StringBuilder(17).append("scanned data for ").append(joinName).toString());
        Seq schemaHashes = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])rawDf.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col(Constants$.MODULE$.SchemaHash())})).distinct().collect())).map((Function1 & Serializable & scala.Serializable)x$7 -> x$7.getString(0), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).toSeq();
        Map<String, String> schemaMap = this.fetchSchemas((Seq<String>)schemaHashes);
        Map codecMap = (Map)schemaMap.mapValues((Function1 & Serializable & scala.Serializable)x$8 -> JoinCodec$.MODULE$.fromLoggingSchema(x$8, null)).map((Function1 & Serializable & scala.Serializable)x -> (Tuple2)Predef$.MODULE$.identity(x), Map$.MODULE$.canBuildFrom());
        Dataset<Row> flattenedDf = this.flattenKeyValueBytes((Dataset<Row>)rawDf, (Map<String, JoinCodec>)codecMap);
        Map<String, String> schemaTblProps = this.buildTableProperties(schemaMap);
        int columnBeforeCount = this.columnCount();
        Dataset<Row> x$4 = flattenedDf;
        String x$5 = Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).loggedTable();
        Map x$6 = this.joinTblProps().$plus$plus(schemaTblProps);
        boolean x$72 = true;
        Seq<String> x$82 = this.tableUtils().insertPartitions$default$4();
        SaveMode x$9 = this.tableUtils().insertPartitions$default$5();
        String x$10 = this.tableUtils().insertPartitions$default$6();
        this.tableUtils().insertPartitions(x$4, x$5, (Map<String, String>)x$6, x$82, x$9, x$10, x$72);
        int columnAfterCount = this.columnCount();
        long outputRowCount = flattenedDf.count();
        long inputRowCount = rawDf.count();
        long failureCount = inputRowCount - outputRowCount;
        this.metrics().gauge(Metrics.Name$.MODULE$.RowCount(), outputRowCount);
        this.metrics().gauge(Metrics.Name$.MODULE$.FailureCount(), failureCount);
        Predef$.MODULE$.println((Object)new StringBuilder(44).append("Processed logs: ").append(outputRowCount).append(" rows success, ").append(failureCount).append(" rows failed.").toString());
        this.metrics().gauge(Metrics.Name$.MODULE$.ColumnBeforeCount(), (long)columnBeforeCount);
        this.metrics().gauge(Metrics.Name$.MODULE$.ColumnAfterCount(), (long)columnAfterCount);
        long elapsedMins = (System.currentTimeMillis() - start) / 60000L;
        this.metrics().gauge(Metrics.Name$.MODULE$.LatencyMinutes(), elapsedMins);
    }

    private static final String escape$1(String str) {
        return str.replace("\\", "\\\\");
    }

    public LogFlattenerJob(SparkSession session, Join joinConf, String endDate, String logTable, String schemaTable) {
        this.session = session;
        this.joinConf = joinConf;
        this.endDate = endDate;
        this.logTable = logTable;
        this.schemaTable = schemaTable;
        this.tableUtils = new TableUtils(session);
        this.joinTblProps = (Map)Option$.MODULE$.apply((Object)joinConf.metaData.tableProperties).map((Function1 & Serializable & scala.Serializable)map -> ScalaVersionSpecificCollectionsConverter$.MODULE$.convertJavaMapToScala(map)).getOrElse((Function0 & Serializable & scala.Serializable)() -> Predef$.MODULE$.Map().empty());
        this.metrics = Metrics.Context$.MODULE$.apply(Metrics.Environment$.MODULE$.JoinLogFlatten(), joinConf);
    }
}

