/*
 * Decompiled with CFR 0.152.
 */
package com.iaja.ldavis.examples;

import com.iaja.ldavis.examples.DataPreparation$;
import com.iaja.ldavis.examples.Logger;
import com.iaja.ldavis.examples.Logger$class;
import com.iaja.ldavis.examples.NewsData;
import org.apache.spark.SparkContext;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.WordLengthFilter;
import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.ml.feature.StopWordsRemover$;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.linalg.DenseVector;
import org.apache.spark.ml.linalg.Matrix;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import scala.Array$;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.Tuple4;
import scala.collection.GenIterable;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.WrappedArray;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.Symbols;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

public final class DataPreparation$
implements Logger {
    public static final DataPreparation$ MODULE$;
    private final Function1<WrappedArray<String>, Object> getSize;
    private final UserDefinedFunction sizeUdf;
    private final org.slf4j.Logger log;

    static {
        new DataPreparation$();
    }

    @Override
    public org.slf4j.Logger log() {
        return this.log;
    }

    @Override
    public void com$iaja$ldavis$examples$Logger$_setter_$log_$eq(org.slf4j.Logger x$1) {
        this.log = x$1;
    }

    public Function1<WrappedArray<String>, Object> getSize() {
        return this.getSize;
    }

    public UserDefinedFunction sizeUdf() {
        return this.sizeUdf;
    }

    public void main(String[] args) {
        int vocabSize = 50000;
        SparkSession spark = SparkSession$.MODULE$.builder().appName("DataPreparation").master("local[4]").config("spark.sql.parquet.enableVectorizedReader", "false").getOrCreate();
        SparkContext sc = spark.sparkContext();
        sc.setLogLevel("ERROR");
        sc.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
        JavaUniverse $u = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        public final class Com_iaja_ldavis_examples_DataPreparation$$typecreator3$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $m.staticClass("com.iaja.ldavis.examples.NewsData").asType().toTypeConstructor();
            }

            public Com_iaja_ldavis_examples_DataPreparation$$typecreator3$1() {
            }
        }
        Dataset df = spark.implicits().rddToDatasetHolder(spark.sparkContext().wholeTextFiles("resources/dataset/20news-bydate/20news-bydate-train/*", spark.sparkContext().wholeTextFiles$default$2()).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final NewsData apply(Tuple2<String, String> x) {
                return new NewsData((String)x._2());
            }
        }, ClassTag$.MODULE$.apply(NewsData.class)), spark.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Com_iaja_ldavis_examples_DataPreparation$$typecreator3$1()))).toDS();
        df.show();
        Tokenizer tokenizer = (Tokenizer)new Tokenizer().setInputCol("content").setOutputCol("words");
        StopWordsRemover remover = new StopWordsRemover().setStopWords(StopWordsRemover$.MODULE$.loadDefaultStopWords("english")).setInputCol(tokenizer.getOutputCol()).setOutputCol("words_filtered");
        WordLengthFilter filterOnlyText = new WordLengthFilter().setInputCol("words_filtered").setOutputCol("filteredWords").setWordLength(3);
        CountVectorizer cv = new CountVectorizer().setInputCol("filteredWords").setOutputCol("features").setVocabSize(vocabSize);
        LDA lda = new LDA().setOptimizer("online").setK(10).setMaxIter(3);
        Pipeline pipeline = new Pipeline().setStages((PipelineStage[])((Object[])new PipelineStage[]{tokenizer, remover, filterOnlyText, cv, lda}));
        this.log().info("Starting pipeline fit...");
        PipelineModel pipeLineModel = pipeline.fit(df);
        Dataset transformedDf = pipeLineModel.transform(df);
        LDAModel ldaModel = (LDAModel)pipeLineModel.stages()[4];
        CountVectorizerModel cvModel = (CountVectorizerModel)pipeLineModel.stages()[3];
        String[] vocab = ((CountVectorizerModel)pipeLineModel.stages()[3]).vocabulary();
        JavaUniverse $u2 = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m2 = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        public final class Com_iaja_ldavis_examples_DataPreparation$$typecreator5$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.Tuple2"), List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Types.TypeApi[]{$u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$), $m.staticClass("scala.Int").asType().toTypeConstructor()})));
            }

            public Com_iaja_ldavis_examples_DataPreparation$$typecreator5$1() {
            }
        }
        Dataset vocabDf = spark.implicits().rddToDatasetHolder(sc.parallelize((Seq)Predef$.MODULE$.wrapRefArray((Object[])Predef$.MODULE$.refArrayOps((Object[])vocab).zipWithIndex(Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class)))), sc.parallelize$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)), spark.implicits().newProductEncoder(((TypeTags)$u2).TypeTag().apply((Mirror)$m2, (TypeCreator)new Com_iaja_ldavis_examples_DataPreparation$$typecreator5$1()))).toDF((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"term", "termIndex"}));
        cvModel.save("/tmp/scalaLDAvis/model/cv-model");
        ldaModel.save("/tmp/scalaLDAvis/model/spark-lda");
        transformedDf.write().json("/tmp/scalaLDAvis/model/transformedDF");
        transformedDf.withColumn("doc_size", this.sizeUdf().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{transformedDf.apply("filteredWords")}))).select("doc_size", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"topicDistribution"})).filter(spark.implicits().StringToColumn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"doc_size"}))).$((Seq)Nil$.MODULE$).$greater((Object)BoxesRunTime.boxToInteger((int)0))).show();
        try {
            Tuple4 tuple4;
            Tuple4<RDD<String>, RDD<String>, RDD<Object>, RDD<String>> tuple42 = this.generateLDAVisData(spark, vocabSize, ldaModel, (Dataset<Row>)transformedDf, vocab);
            if (tuple42 == null) {
                throw new MatchError(tuple42);
            }
            RDD phi = (RDD)tuple42._1();
            RDD thetaWithSize = (RDD)tuple42._2();
            RDD termFreq = (RDD)tuple42._3();
            RDD vocabRdd = (RDD)tuple42._4();
            Tuple4 tuple43 = tuple4 = new Tuple4((Object)phi, (Object)thetaWithSize, (Object)termFreq, (Object)vocabRdd);
            RDD phi2 = (RDD)tuple43._1();
            RDD thetaWithSize2 = (RDD)tuple43._2();
            RDD termFreq2 = (RDD)tuple43._3();
            RDD vocabRdd2 = (RDD)tuple43._4();
            phi2.saveAsTextFile("/tmp/scalaLDAvis/phi/");
            thetaWithSize2.saveAsTextFile("/tmp/scalaLDAvis/theta");
            termFreq2.saveAsTextFile("/tmp/scalaLDAvis/termFreq");
            vocabRdd2.saveAsTextFile("/tmp/scalaLDAvis/vocab");
        }
        catch (Exception exception) {
            this.log().error("Exception occured while writing the LDAViz data", (Throwable)exception);
        }
        spark.close();
        System.exit(0);
    }

    public Tuple4<RDD<String>, RDD<String>, RDD<Object>, RDD<String>> generateLDAVisData(SparkSession spark, int vocabSize, LDAModel ldaModel, Dataset<Row> transformedDf, String[] vocab) {
        SparkContext sc = spark.sparkContext();
        transformedDf.cache();
        Matrix topicsMatrix = ldaModel.topicsMatrix();
        double[][] phiMatrix = (double[][])topicsMatrix.transpose().rowIter().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final double[] apply(Vector x$3) {
                return x$3.toDense().toArray();
            }
        }).toArray(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Double.TYPE)));
        RDD qual$1 = sc.parallelize((Seq)Predef$.MODULE$.wrapRefArray((Object[])Predef$.MODULE$.refArrayOps((Object[])phiMatrix).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(double[] x$4) {
                return Predef$.MODULE$.doubleArrayOps(x$4).mkString(",");
            }
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)))), sc.parallelize$default$2(), ClassTag$.MODULE$.apply(String.class));
        int x$8 = 1;
        boolean x$9 = qual$1.coalesce$default$2();
        Option x$10 = qual$1.coalesce$default$3();
        RDD phiMatRdd = qual$1.coalesce(x$8, x$9, x$10, (Ordering)Ordering.String$.MODULE$);
        RDD qual$2 = transformedDf.withColumn("doc_size", this.sizeUdf().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{transformedDf.apply("filteredWords")}))).select("doc_size", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"topicDistribution"})).filter(spark.implicits().StringToColumn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"doc_size"}))).$((Seq)Nil$.MODULE$).$greater((Object)BoxesRunTime.boxToInteger((int)0))).rdd().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final List<Object> apply(Row r) {
                double d = r.getInt(0);
                return Predef$.MODULE$.doubleArrayOps(((Vector)r.get(1)).toArray()).toList().$colon$colon((Object)BoxesRunTime.boxToDouble((double)d));
            }
        }, ClassTag$.MODULE$.apply(List.class)).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(List<Object> x$6) {
                return x$6.mkString(",");
            }
        }, ClassTag$.MODULE$.apply(String.class));
        int x$11 = 100;
        boolean x$12 = qual$2.coalesce$default$2();
        Option x$13 = qual$2.coalesce$default$3();
        RDD thetaMatWithDocSizesRdd = qual$2.coalesce(x$11, x$12, x$13, (Ordering)Ordering.String$.MODULE$);
        RDD docTopicDist = transformedDf.select("topicDistribution", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).rdd().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final int apply(Row r) {
                return ((Vector)r.get(0)).toArray().length;
            }
        }, ClassTag$.MODULE$.Int());
        int[] freqArray = (int[])Predef$.MODULE$.doubleArrayOps(((DenseVector)transformedDf.select("features", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).rdd().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final DenseVector apply(Row x) {
                return ((Vector)x.get(0)).toDense();
            }
        }, ClassTag$.MODULE$.apply(DenseVector.class)).reduce((Function2)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final DenseVector apply(DenseVector a, DenseVector b) {
                return new DenseVector((double[])Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.doubleArrayOps(a.toArray()).zip((GenIterable)Predef$.MODULE$.wrapDoubleArray(b.toArray()), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class)))).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final double apply(Tuple2<Object, Object> x) {
                        return x._1$mcD$sp() + x._2$mcD$sp();
                    }
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Double())));
            }
        })).toArray()).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final int apply(double x$7) {
                return this.apply$mcID$sp(x$7);
            }

            public int apply$mcID$sp(double x$7) {
                return (int)x$7;
            }
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Int()));
        RDD qual$3 = sc.parallelize((Seq)Predef$.MODULE$.wrapIntArray(freqArray), sc.parallelize$default$2(), ClassTag$.MODULE$.Int());
        int x$14 = 1;
        boolean x$15 = qual$3.coalesce$default$2();
        Option x$16 = qual$3.coalesce$default$3();
        RDD termFreqRdd = qual$3.coalesce(x$14, x$15, x$16, (Ordering)Ordering.Int$.MODULE$);
        RDD qual$4 = sc.parallelize((Seq)Predef$.MODULE$.wrapRefArray((Object[])vocab), sc.parallelize$default$2(), ClassTag$.MODULE$.apply(String.class));
        int x$17 = 1;
        boolean x$18 = qual$4.coalesce$default$2();
        Option x$19 = qual$4.coalesce$default$3();
        RDD vocabRdd = qual$4.coalesce(x$17, x$18, x$19, (Ordering)Ordering.String$.MODULE$);
        Predef$.MODULE$.refArrayOps((Object[])thetaMatWithDocSizesRdd.take(10)).foreach((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final void apply(Object x) {
                Predef$.MODULE$.println(x);
            }
        });
        return new Tuple4((Object)phiMatRdd, (Object)thetaMatWithDocSizesRdd, (Object)termFreqRdd, (Object)vocabRdd);
    }

    private DataPreparation$() {
        MODULE$ = this;
        Logger$class.$init$(this);
        this.getSize = new Serializable(){
            public static final long serialVersionUID = 0L;

            public final int apply(WrappedArray<String> x$1) {
                return x$1.size();
            }
        };
        JavaUniverse $u = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        public final class Com_iaja_ldavis_examples_DataPreparation$$typecreator1$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala.collection.mutable").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.collection.mutable.WrappedArray"), List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Types.TypeApi[]{$u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$)})));
            }

            public Com_iaja_ldavis_examples_DataPreparation$$typecreator1$1() {
            }
        }
        this.sizeUdf = functions$.MODULE$.udf(this.getSize(), ((TypeTags)package$.MODULE$.universe()).TypeTag().Int(), ((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Com_iaja_ldavis_examples_DataPreparation$$typecreator1$1()));
    }
}

