/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.plugins.raw.tools;

import com.typesafe.config.Config;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.raw.RawSparkBatchWriter;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.raw.tools.FolderCompactionUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.raw.tools.WhereCondition;
import it.agilelab.bigdata.wasp.consumers.spark.strategies.gdpr.utils.hdfs.HdfsUtils$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.RawModel;
import it.agilelab.bigdata.wasp.models.RawOptions;
import java.io.Serializable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.DataType$;
import org.apache.spark.sql.types.StructType;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005f\u0001\u0002\u0006\f\u0001yAQ!\f\u0001\u0005\u00029BQ!\r\u0001\u0005\u0002IBQ!\r\u0001\u0005\u00029CQ\u0001\u001f\u0001\u0005\u0002eDq!!\b\u0001\t\u0003\ty\u0002C\u0004\u0002L\u0001!\t!!\u0014\t\u000f\u0005}\u0003\u0001\"\u0002\u0002b!A\u0011q\u0010\u0001\u0005\u0002-\t\t\tC\u0004\u0002\u000e\u0002!\t!a$\u0003!\u0019{G\u000eZ3s\u0007>l\u0007/Y2uS>t'B\u0001\u0007\u000e\u0003\u0015!xn\u001c7t\u0015\tqq\"A\u0002sC^T!\u0001E\t\u0002\u000fAdWoZ5og*\u0011!cE\u0001\u0006gB\f'o\u001b\u0006\u0003)U\t\u0011bY8ogVlWM]:\u000b\u0005Y9\u0012\u0001B<bgBT!\u0001G\r\u0002\u000f\tLw\rZ1uC*\u0011!dG\u0001\tC\u001eLG.\u001a7bE*\tA$\u0001\u0002ji\u000e\u00011c\u0001\u0001 KA\u0011\u0001eI\u0007\u0002C)\t!%A\u0003tG\u0006d\u0017-\u0003\u0002%C\t1\u0011I\\=SK\u001a\u0004\"AJ\u0016\u000e\u0003\u001dR!\u0001K\u0015\u0002\u000f1|wmZ5oO*\u0011!&F\u0001\u0005G>\u0014X-\u0003\u0002-O\t9Aj\\4hS:<\u0017A\u0002\u001fj]&$h\bF\u00010!\t\u0001\u0004!D\u0001\f\u0003\u001d\u0019w.\u001c9bGR$2a\r\u001cC!\t\u0001C'\u0003\u00026C\t!QK\\5u\u0011\u00159$\u00011\u00019\u0003\u0011\u0019wN\u001c4\u0011\u0005e\u0002U\"\u0001\u001e\u000b\u0005mb\u0014AB2p]\u001aLwM\u0003\u0002>}\u0005AA/\u001f9fg\u00064WMC\u0001@\u0003\r\u0019w.\\\u0005\u0003\u0003j\u0012aaQ8oM&<\u0007\"\u0002\n\u0003\u0001\u0004\u0019\u0005C\u0001#M\u001b\u0005)%B\u0001$H\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003%!S!!\u0013&\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0015aA8sO&\u0011Q*\u0012\u0002\r'B\f'o[*fgNLwN\u001c\u000b\u0007g=;\u0016L]<\t\u000bA\u001b\u0001\u0019A)\u0002\u0015%t\u0007/\u001e;N_\u0012,G\u000e\u0005\u0002S+6\t1K\u0003\u0002U+\u00051Qn\u001c3fYNL!AV*\u0003\u0011I\u000bw/T8eK2DQ\u0001W\u0002A\u0002E\u000b1b\\;uaV$Xj\u001c3fY\")!l\u0001a\u00017\u0006Q\u0001/\u0019:uSRLwN\\:\u0011\tq\u001bg-\u001b\b\u0003;\u0006\u0004\"AX\u0011\u000e\u0003}S!\u0001Y\u000f\u0002\rq\u0012xn\u001c;?\u0013\t\u0011\u0017%\u0001\u0004Qe\u0016$WMZ\u0005\u0003I\u0016\u00141!T1q\u0015\t\u0011\u0017\u0005\u0005\u0002]O&\u0011\u0001.\u001a\u0002\u0007'R\u0014\u0018N\\4\u0011\u0007)|gM\u0004\u0002l[:\u0011a\f\\\u0005\u0002E%\u0011a.I\u0001\ba\u0006\u001c7.Y4f\u0013\t\u0001\u0018O\u0001\u0003MSN$(B\u00018\"\u0011\u0015\u00198\u00011\u0001u\u00035qW/\u001c)beRLG/[8ogB\u0011\u0001%^\u0005\u0003m\u0006\u00121!\u00138u\u0011\u0015\u00112\u00011\u0001D\u0003\u0019!W\r\\3uKR!1G_>~\u0011\u0015\u0011B\u00011\u0001D\u0011\u0015aH\u00011\u0001g\u0003\u001d\u0011xn\u001c;ESJDQA \u0003A\u0002}\f1\u0001\u001a4t!\u0011Qw.!\u0001\u0011\t\u0005\r\u0011q\u0003\b\u0005\u0003\u000b\t)B\u0004\u0003\u0002\b\u0005Ma\u0002BA\u0005\u0003#qA!a\u0003\u0002\u00109\u0019a,!\u0004\n\u0003-K!!\u0013&\n\u0005IA\u0015B\u0001$H\u0013\tqW)\u0003\u0003\u0002\u001a\u0005m!!\u0003#bi\u00064%/Y7f\u0015\tqW)\u0001\u0003sK\u0006$GCCA\u0011\u0003s\tY$!\u0010\u0002@A1\u0001%a\t\u0000\u0003OI1!!\n\"\u0005\u0019!V\u000f\u001d7feA!!n\\A\u0015!\u0011\tY#!\u000e\u000e\u0005\u00055\"\u0002BA\u0018\u0003c\t!AZ:\u000b\u0007\u0005M\u0002*\u0001\u0004iC\u0012|w\u000e]\u0005\u0005\u0003o\tiC\u0001\u0003QCRD\u0007\"\u0002\n\u0006\u0001\u0004\u0019\u0005\"\u0002)\u0006\u0001\u0004\t\u0006\"\u0002.\u0006\u0001\u0004Y\u0006bBA!\u000b\u0001\u0007\u00111I\u0001\u0010o\",'/Z\"p]\u0012LG/[8ogB!!n\\A#!\r\u0001\u0014qI\u0005\u0004\u0003\u0013Z!AD,iKJ,7i\u001c8eSRLwN\\\u0001\u001cI\u0016dW\r^3F[B$\u0018\u0010U1si&$\u0018n\u001c8G_2$WM]:\u0015\u0011\u0005\u001d\u0012qJA,\u00037Bq!a\f\u0007\u0001\u0004\t\t\u0006\u0005\u0003\u0002,\u0005M\u0013\u0002BA+\u0003[\u0011!BR5mKNK8\u000f^3n\u0011\u001d\tIF\u0002a\u0001\u0003S\tQ\u0002Z1uC\u001a\u0014\u0018-\\3S_>$\bbBA/\r\u0001\u0007\u0011qE\u0001\rI\u0016dW\r^3e\r&dWm]\u0001\tSN\u0004\u0016M]3oiR1\u00111MA5\u0003[\u00022\u0001IA3\u0013\r\t9'\t\u0002\b\u0005>|G.Z1o\u0011\u001d\tYg\u0002a\u0001\u0003S\tQa\u00195jY\u0012Dq!a\u001c\b\u0001\u0004\tI#\u0001\u0007qCJ,g\u000e\u001e+p\r&tG\rK\u0002\b\u0003g\u0002B!!\u001e\u0002|5\u0011\u0011q\u000f\u0006\u0004\u0003s\n\u0013AC1o]>$\u0018\r^5p]&!\u0011QPA<\u0005\u001d!\u0018-\u001b7sK\u000e\f\u0011\u0004Z3mKR,W)\u001c9us\u001a{G\u000eZ3sgV\u0003x/\u0019:egRA\u0011qEAB\u0003\u000b\u000bI\tC\u0004\u00020!\u0001\r!!\u0015\t\u000f\u0005\u001d\u0005\u00021\u0001\u0002*\u0005\t\u0001\u000fC\u0004\u0002\f\"\u0001\r!!\u000b\u0002\u000bUtG/\u001b7\u0002\u000b]\u0014\u0018\u000e^3\u0015\u000bM\n\t*!(\t\u000f\u0005M\u0015\u00021\u0001\u0002\u0016\u00061qO]5uKJ\u0004B!a&\u0002\u001a6\tQ\"C\u0002\u0002\u001c6\u00111CU1x'B\f'o\u001b\"bi\u000eDwK]5uKJDa!a(\n\u0001\u0004y\u0018A\u00033bi\u00064'/Y7fg\u0002")
public class FolderCompaction
implements Logging {
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    public void compact(Config conf, SparkSession spark) {
        scala.collection.immutable.Map<String, List<String>> partitions = FolderCompactionUtils$.MODULE$.parsePartitions(conf);
        RawModel inputModel = FolderCompactionUtils$.MODULE$.parseModel(conf, FolderCompactionUtils$.MODULE$.INPUT_MODEL_CONF_KEY());
        RawModel outputModel = FolderCompactionUtils$.MODULE$.parseModel(conf, FolderCompactionUtils$.MODULE$.OUTPUT_MODEL_CONF_KEY());
        int numPartitions = conf.getInt(FolderCompactionUtils$.MODULE$.NUM_PARTITIONS_CONF_KEY());
        this.compact(inputModel, outputModel, partitions, numPartitions, spark);
    }

    public void compact(RawModel inputModel, RawModel outputModel, scala.collection.immutable.Map<String, List<String>> partitions, int numPartitions, SparkSession spark) {
        List dataframes;
        List<WhereCondition> whereConditions = FolderCompactionUtils$.MODULE$.generateWhereConditions(partitions, inputModel, outputModel);
        Tuple2<List<Dataset<Row>>, List<Path>> tuple2 = this.read(spark, inputModel, partitions, whereConditions);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        List list = dataframes = (List)tuple2._1();
        List dataframes2 = list;
        List repartitionedDataFrames = (List)dataframes2.map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.repartition(numPartitions), List$.MODULE$.canBuildFrom());
        this.write(new RawSparkBatchWriter(outputModel, spark.sparkContext()), (List<Dataset<Row>>)repartitionedDataFrames);
        this.delete(spark, inputModel.uri(), (List<Dataset<Row>>)dataframes2);
    }

    public void delete(SparkSession spark, String rootDir, List<Dataset<Row>> dfs) {
        List<Dataset<Row>> list = dfs;
        if (Nil$.MODULE$.equals(list)) {
            this.logger().info((Function0 & Serializable & scala.Serializable)() -> "Nothing to delete");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (list instanceof .colon.colon) {
            .colon.colon colon2 = (.colon.colon)list;
            Dataset hDf = (Dataset)colon2.head();
            List tailDfs = colon2.tl$access$1();
            Dataset unionDF = (Dataset)tailDfs.foldLeft((Object)hDf, (Function2 & Serializable & scala.Serializable)(x$2, x$3) -> x$2.union(x$3));
            List files = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])unionDF.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.input_file_name()})).distinct().as(Encoders$.MODULE$.STRING()).collect())).toList();
            List list2 = files;
            if (Nil$.MODULE$.equals(list2)) {
                this.logger().info((Function0 & Serializable & scala.Serializable)() -> "Nothing to delete");
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else if (list2 instanceof .colon.colon) {
                .colon.colon colon3 = (.colon.colon)list2;
                String head = (String)colon3.head();
                FileSystem fs = new Path(head).getFileSystem(spark.sparkContext().hadoopConfiguration());
                List pathsToDelete = (List)files.map((Function1 & Serializable & scala.Serializable)s -> fs.makeQualified(new Path(s)), List$.MODULE$.canBuildFrom());
                this.logger().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(16).append("Deleting files:\n").append(pathsToDelete.mkString("\t", "\n\t", "")).toString());
                files.foreach((Function1 & Serializable & scala.Serializable)p -> BoxesRunTime.boxToBoolean((boolean)fs.delete(fs.makeQualified(new Path(p)), false)));
                List<Path> deletedEmptyFolders = this.deleteEmptyPartitionFolders(fs, fs.makeQualified(new Path(rootDir)), (List<Path>)pathsToDelete);
                this.logger().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(23).append("Deleted empty folders: ").append(((TraversableOnce)deletedEmptyFolders.sortBy((Function1 & Serializable & scala.Serializable)x$4 -> x$4.toString(), (Ordering)Ordering.String$.MODULE$)).mkString(", ")).toString());
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                throw new MatchError((Object)list2);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError(list);
        }
    }

    public Tuple2<List<Dataset<Row>>, List<Path>> read(SparkSession spark, RawModel inputModel, scala.collection.immutable.Map<String, List<String>> partitions, List<WhereCondition> whereConditions) {
        DataFrameReader dataFrameReader;
        this.logger().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(45).append("Initialize Spark HDFSReader with this model: ").append(inputModel).toString());
        Try trySchema = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> (StructType)DataType$.MODULE$.fromJson(inputModel.schema()));
        RawOptions options = inputModel.options();
        DataFrameReader reader = spark.read().format(options.format()).options((Map)options.extraOptions().getOrElse((Function0 & Serializable & scala.Serializable)() -> (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Nil$.MODULE$)));
        Try try_ = trySchema;
        if (try_ instanceof Failure) {
            dataFrameReader = reader;
        } else if (try_ instanceof Success) {
            Success success = (Success)try_;
            StructType schema = (StructType)success.value();
            dataFrameReader = reader.schema(schema);
        } else {
            throw new MatchError((Object)try_);
        }
        DataFrameReader readerWithSchema = dataFrameReader;
        String path = HdfsUtils$.MODULE$.getRawModelPathToToLoad(inputModel, spark.sparkContext());
        Path hdfsPath = new Path(path);
        FileSystem hdfs = hdfsPath.getFileSystem(spark.sparkContext().hadoopConfiguration());
        scala.collection.immutable.Map filteredPartitions = (scala.collection.immutable.Map)partitions.filter((Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)FolderCompaction.$anonfun$read$4(inputModel, x0$1)));
        List<Path> inputModelFiles = FolderCompactionUtils$.MODULE$.discoverPartitionFiles(hdfs, hdfsPath, (scala.collection.immutable.Map<String, List<String>>)filteredPartitions);
        this.logger().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(26).append("Loading from this path: '").append(path).append("'").toString());
        Dataset unfilteredDF = readerWithSchema.load(path).unpersist(true);
        List filteredWhereConditions = (List)((List)whereConditions.filter((Function1 & Serializable & scala.Serializable)whereCondition -> BoxesRunTime.boxToBoolean((boolean)FolderCompactionUtils$.MODULE$.filterWhereCondition((List<Path>)inputModelFiles, whereCondition)))).map((Function1 & Serializable & scala.Serializable)x$6 -> x$6.toSparkColumn(), List$.MODULE$.canBuildFrom());
        this.logger().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Where conditions: ").append(filteredWhereConditions).toString());
        List loadedDataFrames = (List)filteredWhereConditions.map((Function1 & Serializable & scala.Serializable)query -> unfilteredDF.where(query), List$.MODULE$.canBuildFrom());
        return new Tuple2((Object)loadedDataFrames, inputModelFiles);
    }

    public List<Path> deleteEmptyPartitionFolders(FileSystem fs, Path dataframeRoot, List<Path> deletedFiles) {
        Set foldersToDelete = (Set)deletedFiles.foldLeft((Object)Predef$.MODULE$.Set().empty(), (Function2 & Serializable & scala.Serializable)(set, path) -> (Set)set.$plus((Object)path.getParent()));
        return (List)foldersToDelete.toList().flatMap((Function1 & Serializable & scala.Serializable)p -> this.deleteEmptyFoldersUpwards(fs, (Path)p, dataframeRoot), List$.MODULE$.canBuildFrom());
    }

    public final boolean isParent(Path child, Path parentToFind) {
        boolean bl;
        block2: {
            while (true) {
                Path path;
                if ((path = child.getParent()) == null) {
                    bl = false;
                    break block2;
                }
                Path path2 = path;
                Path path3 = parentToFind;
                if (!(path2 == null ? path3 != null : !path2.equals(path3))) break;
                child = path;
            }
            bl = true;
        }
        return bl;
    }

    public List<Path> deleteEmptyFoldersUpwards(FileSystem fs, Path p, Path until) {
        Predef$.MODULE$.require(this.isParent(p, until), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(17).append(p).append(" is not child of ").append(until).toString());
        return this.rDeleteEmptyFoldersUpwards$1(fs, p, until, List$.MODULE$.empty());
    }

    public void write(RawSparkBatchWriter writer, List<Dataset<Row>> dataframes) {
        dataframes.foreach((Function1 & Serializable & scala.Serializable)df -> {
            writer.write((Dataset<Row>)df);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$read$4(RawModel inputModel$1, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        String colName = (String)tuple2._1();
        boolean bl = inputModel$1.options().partitionBy().exists((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)x$5.contains((Object)colName)));
        return bl;
    }

    private final List rDeleteEmptyFoldersUpwards$1(FileSystem fs, Path p, Path until, List deletedSoFar) {
        List list;
        block2: {
            while (true) {
                Path path = p;
                Path path2 = until;
                if (!(path != null ? !path.equals(path2) : path2 != null)) {
                    list = deletedSoFar;
                    break block2;
                }
                if (!new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])fs.listStatus(p))).isEmpty()) break;
                fs.delete(p, false);
                Path path3 = p;
                deletedSoFar = deletedSoFar.$colon$colon((Object)path3);
                p = p.getParent();
            }
            list = deletedSoFar;
        }
        return list;
    }

    public FolderCompaction() {
        Logging.$init$((Logging)this);
    }
}

