/*
 * Decompiled with CFR 0.152.
 */
package ai.starlake.job.index.kafkaload;

import ai.starlake.config.Settings;
import ai.starlake.config.SparkEnv;
import ai.starlake.job.index.kafkaload.DataFrameTransform;
import ai.starlake.job.index.kafkaload.KafkaJobConfig;
import ai.starlake.schema.model.SinkType;
import ai.starlake.schema.model.Views;
import ai.starlake.utils.JobBase;
import ai.starlake.utils.JobResult;
import ai.starlake.utils.SparkJob;
import ai.starlake.utils.SparkJobResult;
import ai.starlake.utils.Utils$;
import ai.starlake.utils.kafka.KafkaClient;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005a\u0001B\u0007\u000f\u0001eA\u0001B\n\u0001\u0003\u0006\u0004%\ta\n\u0005\tY\u0001\u0011\t\u0011)A\u0005Q!AQ\u0006\u0001BC\u0002\u0013\ra\u0006\u0003\u00056\u0001\t\u0005\t\u0015!\u00030\u0011\u00151\u0004\u0001\"\u00018\u0011\u001da\u0004A1A\u0005\nuBa!\u0012\u0001!\u0002\u0013q\u0004\"\u0002$\u0001\t\u00039\u0005\"B)\u0001\t\u00039\u0005\"\u0002*\u0001\t\u0013\u0019\u0006\"\u00029\u0001\t\u0003\n\b\"\u0002<\u0001\t\u0003:(\u0001C&bM.\f'j\u001c2\u000b\u0005=\u0001\u0012!C6bM.\fGn\\1e\u0015\t\t\"#A\u0003j]\u0012,\u0007P\u0003\u0002\u0014)\u0005\u0019!n\u001c2\u000b\u0005U1\u0012\u0001C:uCJd\u0017m[3\u000b\u0003]\t!!Y5\u0004\u0001M\u0019\u0001A\u0007\u0011\u0011\u0005mqR\"\u0001\u000f\u000b\u0003u\tQa]2bY\u0006L!a\b\u000f\u0003\r\u0005s\u0017PU3g!\t\tC%D\u0001#\u0015\t\u0019C#A\u0003vi&d7/\u0003\u0002&E\tA1\u000b]1sW*{'-\u0001\blC\u001a\\\u0017MS8c\u0007>tg-[4\u0016\u0003!\u0002\"!\u000b\u0016\u000e\u00039I!a\u000b\b\u0003\u001d-\u000bgm[1K_\n\u001cuN\u001c4jO\u0006y1.\u00194lC*{'mQ8oM&<\u0007%\u0001\u0005tKR$\u0018N\\4t+\u0005y\u0003C\u0001\u00194\u001b\u0005\t$B\u0001\u001a\u0015\u0003\u0019\u0019wN\u001c4jO&\u0011A'\r\u0002\t'\u0016$H/\u001b8hg\u0006I1/\u001a;uS:<7\u000fI\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005aZDCA\u001d;!\tI\u0003\u0001C\u0003.\u000b\u0001\u000fq\u0006C\u0003'\u000b\u0001\u0007\u0001&A\u0006u_BL7mQ8oM&<W#\u0001 \u0011\u0005}\u0012eB\u0001\u0019A\u0013\t\t\u0015'\u0001\u0005TKR$\u0018N\\4t\u0013\t\u0019EI\u0001\tLC\u001a\\\u0017\rV8qS\u000e\u001cuN\u001c4jO*\u0011\u0011)M\u0001\ri>\u0004\u0018nY\"p]\u001aLw\rI\u0001\b_\u001a4Gn\\1e)\u0005A\u0005cA%M\u001d6\t!J\u0003\u0002L9\u0005!Q\u000f^5m\u0013\ti%JA\u0002Uef\u0004\"!I(\n\u0005A\u0013#AD*qCJ\\'j\u001c2SKN,H\u000e^\u0001\u0005Y>\fG-\u0001\u0005ue\u0006t7OZ8n)\t!f\u000e\u0005\u0002VW:\u0011a\u000b\u001b\b\u0003/\u0016t!\u0001\u00172\u000f\u0005e{fB\u0001.^\u001b\u0005Y&B\u0001/\u0019\u0003\u0019a$o\\8u}%\ta,A\u0002pe\u001eL!\u0001Y1\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005q\u0016BA2e\u0003\u0015\u0019\b/\u0019:l\u0015\t\u0001\u0017-\u0003\u0002gO\u0006\u00191/\u001d7\u000b\u0005\r$\u0017BA5k\u0003\u001d\u0001\u0018mY6bO\u0016T!AZ4\n\u00051l'!\u0003#bi\u00064%/Y7f\u0015\tI'\u000eC\u0003p\u0015\u0001\u0007A+\u0001\u0002eM\u0006\u0019!/\u001e8\u0015\u0003I\u00042!\u0013't!\t\tC/\u0003\u0002vE\tI!j\u001c2SKN,H\u000e^\u0001\u0005]\u0006lW-F\u0001y!\tIXP\u0004\u0002{wB\u0011!\fH\u0005\u0003yr\ta\u0001\u0015:fI\u00164\u0017B\u0001@\u0000\u0005\u0019\u0019FO]5oO*\u0011A\u0010\b")
public class KafkaJob
implements SparkJob {
    private final KafkaJobConfig kafkaJobConfig;
    private final Settings settings;
    private final Settings.KafkaTopicConfig topicConfig;
    private SparkEnv sparkEnv;
    private SparkSession session;
    private final Logger logger;
    private volatile byte bitmap$0;

    @Override
    public SparkConf withExtraSparkConf(SparkConf sourceConfig) {
        return SparkJob.withExtraSparkConf$(this, sourceConfig);
    }

    @Override
    public void registerUdf(String udf) {
        SparkJob.registerUdf$(this, udf);
    }

    @Override
    public DataFrameWriter<Row> partitionedDatasetWriter(Dataset<Row> dataset, List<String> partition) {
        return SparkJob.partitionedDatasetWriter$(this, dataset, partition);
    }

    @Override
    public Dataset<Row> partitionDataset(Dataset<Row> dataset, List<String> partition) {
        return SparkJob.partitionDataset$(this, dataset, partition);
    }

    @Override
    public Object analyze(String fullTableName) {
        return SparkJob.analyze$(this, fullTableName);
    }

    @Override
    public void createSparkViews(Views views, Map<String, String> sqlParameters) {
        SparkJob.createSparkViews$(this, views, sqlParameters);
    }

    @Override
    public Tuple3<SinkType, Option<String>, String> parseViewDefinition(String valueWithEnv) {
        return JobBase.parseViewDefinition$(this, valueWithEnv);
    }

    @Override
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> ds) {
        return DatasetLogging.DatasetHelper$(this, ds);
    }

    private SparkEnv sparkEnv$lzycompute() {
        KafkaJob kafkaJob = this;
        synchronized (kafkaJob) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.sparkEnv = SparkJob.sparkEnv$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.sparkEnv;
    }

    @Override
    public SparkEnv sparkEnv() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.sparkEnv$lzycompute() : this.sparkEnv;
    }

    private SparkSession session$lzycompute() {
        KafkaJob kafkaJob = this;
        synchronized (kafkaJob) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.session = SparkJob.session$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.session;
    }

    @Override
    public SparkSession session() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.session$lzycompute() : this.session;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    public KafkaJobConfig kafkaJobConfig() {
        return this.kafkaJobConfig;
    }

    @Override
    public Settings settings() {
        return this.settings;
    }

    private Settings.KafkaTopicConfig topicConfig() {
        return this.topicConfig;
    }

    public Try<SparkJobResult> offload() {
        return Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> !this.kafkaJobConfig().streaming() ? (SparkJobResult)Utils$.MODULE$.withResources((Function0 & Serializable & scala.Serializable)() -> new KafkaClient(this.settings().comet().kafka(), this.settings()), (Function1 & Serializable & scala.Serializable)kafkaClient -> {
            BoxedUnit boxedUnit;
            Some some;
            int n;
            BoxedUnit boxedUnit2;
            BoxedUnit boxedUnit3;
            Dataset dataset;
            Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> tuple2 = kafkaClient.consumeTopicBatch(this.kafkaJobConfig().topicConfigName(), this.session(), this.topicConfig());
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Dataset df = (Dataset)tuple2._1();
            List offsets = (List)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)df, (Object)offsets);
            Tuple2 tuple23 = tuple22;
            Dataset df2 = (Dataset)tuple23._1();
            List offsets2 = (List)tuple23._2();
            Dataset transformedDF = this.transfom((Dataset<Row>)df2);
            Option<Object> option = this.kafkaJobConfig().coalesce();
            if (None$.MODULE$.equals(option)) {
                dataset = transformedDF;
            } else if (option instanceof Some) {
                Some some2 = (Some)option;
                int x = BoxesRunTime.unboxToInt((Object)some2.value());
                dataset = transformedDF.coalesce(x);
            } else {
                throw new MatchError(option);
            }
            Dataset finalDF = dataset;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Saving to {}", new Object[]{this.kafkaJobConfig()});
                boxedUnit3 = BoxedUnit.UNIT;
            } else {
                boxedUnit3 = BoxedUnit.UNIT;
            }
            finalDF.write().mode(this.kafkaJobConfig().mode()).format(this.kafkaJobConfig().format()).options(this.kafkaJobConfig().writeOptions()).save(this.kafkaJobConfig().path());
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Kafka saved messages to offload -> {}", new Object[]{this.kafkaJobConfig().path()});
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            Option<Object> option2 = this.kafkaJobConfig().coalesce();
            if (option2 instanceof Some && 1 == (n = BoxesRunTime.unboxToInt((Object)(some = (Some)option2).value())) && this.kafkaJobConfig().coalesceMerge()) {
                Object object;
                Path targetPath;
                String string = this.kafkaJobConfig().format();
                Path x$1 = targetPath = new Path(this.kafkaJobConfig().path());
                boolean x$22 = false;
                String x$3 = this.settings().storageHandler().list$default$2();
                LocalDateTime x$4 = this.settings().storageHandler().list$default$3();
                Option<Pattern> x$5 = this.settings().storageHandler().list$default$5();
                Path singleFile = (Path)((IterableLike)this.settings().storageHandler().list(x$1, x$3, x$4, x$22, x$5).filter((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)KafkaJob.$anonfun$offload$4(x$2)))).head();
                Path tmpPath = new Path(new StringBuilder(4).append(targetPath.toString()).append(".tmp").toString());
                if (this.settings().storageHandler().move(singleFile, tmpPath)) {
                    this.settings().storageHandler().delete(targetPath);
                    object = BoxesRunTime.boxToBoolean((boolean)this.settings().storageHandler().move(tmpPath, targetPath));
                } else {
                    object = BoxedUnit.UNIT;
                }
                boxedUnit = object;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            kafkaClient.topicSaveOffsets(this.kafkaJobConfig().topicConfigName(), this.topicConfig().accessOptions(), (List<Tuple2<Object, Object>>)offsets2);
            return new SparkJobResult((Option<Dataset<Row>>)new Some((Object)transformedDF));
        }) : (SparkJobResult)Utils$.MODULE$.withResources((Function0 & Serializable & scala.Serializable)() -> new KafkaClient(this.settings().comet().kafka(), this.settings()), (Function1 & Serializable & scala.Serializable)kafkaClient -> {
            Trigger trigger;
            Dataset<Row> df = kafkaClient.consumeTopicStreaming(this.session(), this.topicConfig());
            Dataset<Row> transformedDF = this.transfom(df);
            String string = this.kafkaJobConfig().streamingTrigger().toLowerCase();
            if ("once".equals(string)) {
                trigger = Trigger.Once();
            } else if ("processingtime".equals(string)) {
                trigger = Trigger.ProcessingTime((String)this.kafkaJobConfig().streamingTriggerOption());
            } else if ("continuous".equals(string)) {
                trigger = Trigger.Continuous((String)this.kafkaJobConfig().streamingTriggerOption());
            } else {
                throw new MatchError((Object)string);
            }
            Trigger trigger2 = trigger;
            DataStreamWriter writer = transformedDF.writeStream().outputMode(this.kafkaJobConfig().streamingWriteMode()).format(this.kafkaJobConfig().streamingWriteFormat()).options(this.kafkaJobConfig().writeOptions()).trigger(trigger2);
            Seq<String> seq = this.kafkaJobConfig().streamingWritePartitionBy();
            DataStreamWriter dataStreamWriter = Nil$.MODULE$.equals(seq) ? writer : writer.partitionBy(seq);
            DataStreamWriter partitionedWriter = dataStreamWriter;
            if (this.kafkaJobConfig().streamingWriteToTable()) {
                throw new Exception("streamingWriteToTable Not Supported");
            }
            StreamingQuery streamingQuery = partitionedWriter.start(this.kafkaJobConfig().path());
            streamingQuery.awaitTermination();
            return new SparkJobResult((Option<Dataset<Row>>)None$.MODULE$);
        }));
    }

    public Try<SparkJobResult> load() {
        return Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> (SparkJobResult)Utils$.MODULE$.withResources((Function0 & Serializable & scala.Serializable)() -> new KafkaClient(this.settings().comet().kafka(), this.settings()), (Function1 & Serializable & scala.Serializable)kafkaClient -> {
            Dataset df = this.session().read().format(this.kafkaJobConfig().format()).load((Seq)Predef$.MODULE$.wrapRefArray((Object[])new StringOps(Predef$.MODULE$.augmentString(this.kafkaJobConfig().path())).split(',')));
            Dataset<Row> transformedDF = this.transfom((Dataset<Row>)df);
            kafkaClient.sinkToTopic(this.topicConfig(), transformedDF);
            return new SparkJobResult((Option<Dataset<Row>>)new Some(transformedDF));
        }));
    }

    /*
     * WARNING - void declaration
     */
    private Dataset<Row> transfom(Dataset<Row> df) {
        void var3_6;
        Dataset<Row> dataset;
        Option<DataFrameTransform> option = this.kafkaJobConfig().transformInstance();
        if (option instanceof Some) {
            Some some = (Some)option;
            DataFrameTransform transformer = (DataFrameTransform)some.value();
            dataset = transformer.transform(df);
        } else if (None$.MODULE$.equals(option)) {
            dataset = df;
        } else {
            throw new MatchError(option);
        }
        Dataset<Row> transformedDF = dataset;
        return var3_6;
    }

    @Override
    public Try<JobResult> run() {
        return this.kafkaJobConfig().offload() ? this.offload() : this.load();
    }

    @Override
    public String name() {
        return String.valueOf(this.kafkaJobConfig().topicConfigName());
    }

    public static final /* synthetic */ boolean $anonfun$offload$4(Path x$2) {
        return x$2.getName().startsWith("part-");
    }

    public KafkaJob(KafkaJobConfig kafkaJobConfig, Settings settings) {
        this.kafkaJobConfig = kafkaJobConfig;
        this.settings = settings;
        StrictLogging.$init$((StrictLogging)this);
        DatasetLogging.$init$(this);
        JobBase.$init$(this);
        SparkJob.$init$(this);
        this.topicConfig = (Settings.KafkaTopicConfig)settings.comet().kafka().topics().apply((Object)kafkaJobConfig.topicConfigName());
    }
}

