/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.plugins.raw;

import it.agilelab.bigdata.wasp.consumers.spark.plugins.raw.RawSparkReaderUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkStructuredStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.strategies.gdpr.utils.hdfs.HdfsUtils$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.RawModel;
import it.agilelab.bigdata.wasp.models.RawOptions;
import it.agilelab.bigdata.wasp.models.StreamingReaderModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.types.DataType$;
import org.apache.spark.sql.types.StructType;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.collection.GenTraversableOnce;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.MapLike;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\r4A\u0001B\u0003\u0001-!A1\u0006\u0001B\u0001B\u0003%A\u0006C\u00033\u0001\u0011\u00051\u0007C\u00038\u0001\u0011\u0005\u0003HA\u0011SC^\u001c\u0006/\u0019:l'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6Lgn\u001a*fC\u0012,'O\u0003\u0002\u0007\u000f\u0005\u0019!/Y<\u000b\u0005!I\u0011a\u00029mk\u001eLgn\u001d\u0006\u0003\u0015-\tQa\u001d9be.T!\u0001D\u0007\u0002\u0013\r|gn];nKJ\u001c(B\u0001\b\u0010\u0003\u00119\u0018m\u001d9\u000b\u0005A\t\u0012a\u00022jO\u0012\fG/\u0019\u0006\u0003%M\t\u0001\"Y4jY\u0016d\u0017M\u0019\u0006\u0002)\u0005\u0011\u0011\u000e^\u0002\u0001'\u0011\u0001q#H\u0012\u0011\u0005aYR\"A\r\u000b\u0003i\tQa]2bY\u0006L!\u0001H\r\u0003\r\u0005s\u0017PU3g!\tq\u0012%D\u0001 \u0015\t\u0001\u0013\"A\u0004sK\u0006$WM]:\n\u0005\tz\"AH*qCJ\\7\u000b\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oOJ+\u0017\rZ3s!\t!\u0013&D\u0001&\u0015\t1s%A\u0004m_\u001e<\u0017N\\4\u000b\u0005!j\u0011\u0001B2pe\u0016L!AK\u0013\u0003\u000f1{wmZ5oO\u0006A!/Y<N_\u0012,G\u000e\u0005\u0002.a5\taF\u0003\u00020\u001b\u00051Qn\u001c3fYNL!!\r\u0018\u0003\u0011I\u000bw/T8eK2\fa\u0001P5oSRtDC\u0001\u001b7!\t)\u0004!D\u0001\u0006\u0011\u0015Y#\u00011\u0001-\u0003Y\u0019'/Z1uKN#(/^2ukJ,Gm\u0015;sK\u0006lGcA\u001dZ=R\u0011!h\u0015\t\u0003wAs!\u0001P'\u000f\u0005uReB\u0001 I\u001d\tyTI\u0004\u0002A\u00076\t\u0011I\u0003\u0002C+\u00051AH]8pizJ\u0011\u0001R\u0001\u0004_J<\u0017B\u0001$H\u0003\u0019\t\u0007/Y2iK*\tA)\u0003\u0002\u000b\u0013*\u0011aiR\u0005\u0003\u00172\u000b1a]9m\u0015\tQ\u0011*\u0003\u0002O\u001f\u00069\u0001/Y2lC\u001e,'BA&M\u0013\t\t&KA\u0005ECR\fgI]1nK*\u0011aj\u0014\u0005\u0006)\u000e\u0001\u001d!V\u0001\u0003gN\u0004\"AV,\u000e\u0003=K!\u0001W(\u0003\u0019M\u0003\u0018M]6TKN\u001c\u0018n\u001c8\t\u000bi\u001b\u0001\u0019A.\u0002\u0007\u0015$H\u000e\u0005\u0002.9&\u0011QL\f\u0002\u001c'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ#U\u00196{G-\u001a7\t\u000b}\u001b\u0001\u0019\u00011\u0002)M$(/Z1nS:<'+Z1eKJlu\u000eZ3m!\ti\u0013-\u0003\u0002c]\t!2\u000b\u001e:fC6Lgn\u001a*fC\u0012,'/T8eK2\u0004")
public class RawSparkStructuredStreamingReader
implements SparkStructuredStreamingReader,
Logging {
    private final RawModel rawModel;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    public Dataset<Row> createStructuredStream(StructuredStreamingETLModel etl, StreamingReaderModel streamingReaderModel, SparkSession ss) {
        String string = etl.streamingInput().datastoreModelName();
        String string2 = this.rawModel.name();
        Predef$.MODULE$.require(!(string != null ? !string.equals(string2) : string2 != null), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(87).append("etl model input and rawModel do not match: ").append("etl.streamingInput.name=[").append(etl.streamingInput().datastoreModelName()).append("], rawModel.name=[").append($this.rawModel.name()).append("]").toString());
        this.logger().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(45).append("Initialize Spark Raw Reader with this model: ").append($this.rawModel).toString());
        None$ schema = this.rawModel.schema().isEmpty() ? None$.MODULE$ : new Some((Object)((StructType)DataType$.MODULE$.fromJson(this.rawModel.schema())));
        RawOptions options = this.rawModel.options();
        scala.collection.immutable.Map extraOptions = ((MapLike)options.extraOptions().getOrElse((Function0 & Serializable & scala.Serializable)() -> (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Nil$.MODULE$))).$plus$plus((GenTraversableOnce)etl.options()).$plus$plus((GenTraversableOnce)streamingReaderModel.options());
        DataStreamReader reader = ((DataStreamReader)schema.fold((Function0 & Serializable & scala.Serializable)() -> ss.readStream(), (Function1 & Serializable & scala.Serializable)x$1 -> ss.readStream().schema(x$1))).format(options.format()).options((Map)extraOptions);
        String path = HdfsUtils$.MODULE$.getRawModelPathToToLoad(this.rawModel, ss.sparkContext());
        this.logger().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Load this path: '").append(path).append("'").toString());
        this.logger().info((Function0 & Serializable & scala.Serializable)() -> RawSparkReaderUtils$.MODULE$.printExtraOptions($this.rawModel.name(), (scala.collection.immutable.Map<String, String>)extraOptions));
        return reader.load(path);
    }

    public RawSparkStructuredStreamingReader(RawModel rawModel) {
        this.rawModel = rawModel;
        Logging.$init$((Logging)this);
    }
}

