/*
 * Decompiled with CFR 0.152.
 */
package com.coxautodata;

import com.coxautodata.Config;
import com.coxautodata.OptionsParsing$;
import com.coxautodata.SparkDistCP;
import com.coxautodata.SparkDistCPOptions;
import com.coxautodata.objects.Accumulators;
import com.coxautodata.objects.ConfigSerDeser;
import com.coxautodata.objects.CopyDefinitionWithDependencies;
import com.coxautodata.objects.CopyPartitioner$;
import com.coxautodata.objects.DeleteResult;
import com.coxautodata.objects.DistCPResult;
import com.coxautodata.objects.FileSystemObjectCacher;
import com.coxautodata.objects.Logging;
import com.coxautodata.objects.SerializableFileStatus;
import com.coxautodata.objects.SingleCopyDefinition;
import com.coxautodata.utils.CopyUtils$;
import com.coxautodata.utils.FileListUtils$;
import com.coxautodata.utils.PathUtils$;
import java.io.Serializable;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.TaskContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.RDD$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple4;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.math.Ordering;
import scala.math.Ordering$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

public final class SparkDistCP$
implements Logging {
    public static SparkDistCP$ MODULE$;
    private final Logger com$coxautodata$objects$Logging$$log;

    static {
        new SparkDistCP$();
    }

    @Override
    public String logName() {
        return Logging.logName$(this);
    }

    @Override
    public void setLogLevel(Level level) {
        Logging.setLogLevel$(this, level);
    }

    @Override
    public void logInfo(Function0<String> msg) {
        Logging.logInfo$(this, msg);
    }

    @Override
    public void logDebug(Function0<String> msg) {
        Logging.logDebug$(this, msg);
    }

    @Override
    public void logTrace(Function0<String> msg) {
        Logging.logTrace$(this, msg);
    }

    @Override
    public void logWarning(Function0<String> msg) {
        Logging.logWarning$(this, msg);
    }

    @Override
    public void logError(Function0<String> msg) {
        Logging.logError$(this, msg);
    }

    @Override
    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$(this, msg, throwable);
    }

    @Override
    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$(this, msg, throwable);
    }

    @Override
    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$(this, msg, throwable);
    }

    @Override
    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$(this, msg, throwable);
    }

    @Override
    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$(this, msg, throwable);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public Logger com$coxautodata$objects$Logging$$log() {
        return this.com$coxautodata$objects$Logging$$log;
    }

    @Override
    public final void com$coxautodata$objects$Logging$_setter_$com$coxautodata$objects$Logging$$log_$eq(Logger x$1) {
        this.com$coxautodata$objects$Logging$$log = x$1;
    }

    public void main(String[] args) {
        SparkSession sparkSession = SparkSession$.MODULE$.builder().getOrCreate();
        Config config = OptionsParsing$.MODULE$.parse(args, sparkSession.sparkContext().hadoopConfiguration());
        Tuple2<Seq<Path>, Path> tuple2 = config.sourceAndDestPaths();
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Seq src = (Seq)tuple2._1();
        Path dest = (Path)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)src, (Object)dest);
        Tuple2 tuple23 = tuple22;
        Seq src2 = (Seq)tuple23._1();
        Path dest2 = (Path)tuple23._2();
        this.run(sparkSession, (Seq<Path>)src2, dest2, config.options());
    }

    public void run(SparkSession sparkSession, Seq<Path> sourcePaths, Path destinationPath, SparkDistCPOptions options) {
        RDD<DistCPResult> rDD;
        Predef$.MODULE$.assert(sourcePaths.nonEmpty(), (Function0 & Serializable & scala.Serializable)() -> "At least one source path must be given");
        options.validateOptions();
        if (options.verbose()) {
            sparkSession.sparkContext().setLogLevel("DEBUG");
            this.setLogLevel(Level.DEBUG);
        }
        Seq qualifiedSourcePaths = (Seq)sourcePaths.map((Function1 & Serializable & scala.Serializable)x$2 -> PathUtils$.MODULE$.pathToQualifiedPath(sparkSession.sparkContext().hadoopConfiguration(), (Path)x$2), Seq$.MODULE$.canBuildFrom());
        Path qualifiedDestinationPath = PathUtils$.MODULE$.pathToQualifiedPath(sparkSession.sparkContext().hadoopConfiguration(), destinationPath);
        RDD<Tuple2<URI, CopyDefinitionWithDependencies>> sourceRDD = FileListUtils$.MODULE$.getSourceFiles(sparkSession.sparkContext(), (Seq<URI>)((Seq)qualifiedSourcePaths.map((Function1 & Serializable & scala.Serializable)x$3 -> x$3.toUri(), Seq$.MODULE$.canBuildFrom())), qualifiedDestinationPath.toUri(), options.updateOverwritePathBehaviour(), options.numListstatusThreads(), options.filterNot());
        RDD<Tuple2<URI, SerializableFileStatus>> destinationRDD = FileListUtils$.MODULE$.getDestinationFiles(sparkSession.sparkContext(), qualifiedDestinationPath, options);
        RDD joined = RDD$.MODULE$.rddToPairRDDFunctions(sourceRDD, ClassTag$.MODULE$.apply(URI.class), ClassTag$.MODULE$.apply(CopyDefinitionWithDependencies.class), Ordering$.MODULE$.ordered((Function1)Predef$.MODULE$.$conforms())).fullOuterJoin(destinationRDD);
        RDD toCopy = joined.collect((PartialFunction)new scala.Serializable(){
            public static final long serialVersionUID = 0L;

            public final <A1 extends Tuple2<URI, Tuple2<Option<CopyDefinitionWithDependencies>, Option<SerializableFileStatus>>>, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                Object object;
                Option option;
                Tuple2 tuple2;
                A1 A1 = x1;
                if (A1 != null && (tuple2 = (Tuple2)A1._2()) != null && (option = (Option)tuple2._1()) instanceof Some) {
                    Some some = (Some)option;
                    CopyDefinitionWithDependencies s = (CopyDefinitionWithDependencies)some.value();
                    object = s;
                } else {
                    object = function1.apply(x1);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(Tuple2<URI, Tuple2<Option<CopyDefinitionWithDependencies>, Option<SerializableFileStatus>>> x1) {
                Option option;
                Tuple2 tuple2;
                Tuple2<URI, Tuple2<Option<CopyDefinitionWithDependencies>, Option<SerializableFileStatus>>> tuple22 = x1;
                boolean bl = tuple22 != null && (tuple2 = (Tuple2)tuple22._2()) != null && (option = (Option)tuple2._1()) instanceof Some;
                return bl;
            }
        }, ClassTag$.MODULE$.apply(CopyDefinitionWithDependencies.class));
        Accumulators accumulators = new Accumulators(sparkSession);
        RDD<DistCPResult> copyResult = this.doCopy((RDD<CopyDefinitionWithDependencies>)toCopy, accumulators, options);
        if (options.delete()) {
            RDD toDelete = joined.collect((PartialFunction)new scala.Serializable(){
                public static final long serialVersionUID = 0L;

                /*
                 * Enabled aggressive block sorting
                 */
                public final <A1 extends Tuple2<URI, Tuple2<Option<CopyDefinitionWithDependencies>, Option<SerializableFileStatus>>>, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                    Object object;
                    A1 A1 = x2;
                    if (A1 != null) {
                        Option option;
                        URI d = (URI)A1._1();
                        Tuple2 tuple2 = (Tuple2)A1._2();
                        if (tuple2 != null && None$.MODULE$.equals(option = (Option)tuple2._1())) {
                            object = d;
                            return (B1)object;
                        }
                    }
                    object = function1.apply(x2);
                    return (B1)object;
                }

                public final boolean isDefinedAt(Tuple2<URI, Tuple2<Option<CopyDefinitionWithDependencies>, Option<SerializableFileStatus>>> x2) {
                    Option option;
                    Tuple2 tuple2;
                    Tuple2<URI, Tuple2<Option<CopyDefinitionWithDependencies>, Option<SerializableFileStatus>>> tuple22 = x2;
                    boolean bl = tuple22 != null && (tuple2 = (Tuple2)tuple22._2()) != null && None$.MODULE$.equals(option = (Option)tuple2._1());
                    return bl;
                }
            }, ClassTag$.MODULE$.apply(URI.class));
            rDD = this.doDelete((RDD<URI>)toDelete, accumulators, options);
        } else {
            rDD = sparkSession.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(DistCPResult.class));
        }
        RDD<DistCPResult> deleteResult = rDD;
        RDD allResults = copyResult.union((RDD)deleteResult);
        Option<URI> option = options.log();
        if (None$.MODULE$.equals(option)) {
            allResults.foreach((Function1 & Serializable & scala.Serializable)x$4 -> {
                SparkDistCP$.$anonfun$run$4(x$4);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (option instanceof Some) {
            Some some = (Some)option;
            URI f = (URI)some.value();
            int x$1 = 1;
            Ordering x$22 = allResults.repartition$default$2(x$1);
            sparkSession.implicits().rddToDatasetHolder(allResults.repartition(x$1, x$22).map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.getMessage(), ClassTag$.MODULE$.apply(String.class)), sparkSession.implicits().newStringEncoder()).toDS().write().mode(SaveMode.Append).csv(f.toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError(option);
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(27).append("SparkDistCP Run Statistics\n").append(accumulators.getOutputText()).toString());
    }

    public RDD<DistCPResult> doCopy(RDD<CopyDefinitionWithDependencies> sourceRDD, Accumulators accumulators, SparkDistCPOptions options) {
        ConfigSerDeser serConfig = new ConfigSerDeser(sourceRDD.sparkContext().hadoopConfiguration());
        RDD<Tuple2<Tuple2<Object, Object>, CopyDefinitionWithDependencies>> qual$1 = this.batchAndPartitionFiles(sourceRDD, options.maxFilesPerTask(), options.maxBytesPerTask());
        Function1 & Serializable & scala.Serializable x$1 = (Function1 & Serializable & scala.Serializable)iterator -> {
            Configuration hadoopConfiguration = serConfig.get();
            long attemptID = TaskContext$.MODULE$.get().taskAttemptId();
            FileSystemObjectCacher fsCache = new FileSystemObjectCacher(hadoopConfiguration);
            return MODULE$.DistCPIteratorImplicit(iterator.flatMap((Function1 & Serializable & scala.Serializable)x$6 -> ((CopyDefinitionWithDependencies)x$6._2()).getAllCopyDefinitions())).collectMapWithEmptyCollection((Function2 & Serializable & scala.Serializable)(d, z) -> BoxesRunTime.boxToBoolean((boolean)z.contains((Object)d)), (Function1 & Serializable & scala.Serializable)d -> {
                DistCPResult r = CopyUtils$.MODULE$.handleCopy(fsCache.getOrCreate(d.source().uri()), fsCache.getOrCreate(d.destination()), (SingleCopyDefinition)d, options, attemptID);
                accumulators.handleResult(r);
                return r;
            });
        };
        boolean x$2 = qual$1.mapPartitions$default$2();
        return qual$1.mapPartitions((Function1)x$1, x$2, ClassTag$.MODULE$.apply(DistCPResult.class));
    }

    public RDD<DistCPResult> doDelete(RDD<URI> destRDD, Accumulators accumulators, SparkDistCPOptions options) {
        ConfigSerDeser serConfig = new ConfigSerDeser(destRDD.sparkContext().hadoopConfiguration());
        long count = destRDD.count();
        RDD qual$1 = destRDD.repartition(RichInt$.MODULE$.max$extension(Predef$.MODULE$.intWrapper((int)(count / (long)options.maxFilesPerTask())), 1), Ordering$.MODULE$.ordered((Function1)Predef$.MODULE$.$conforms()));
        Function1 & Serializable & scala.Serializable x$1 = (Function1 & Serializable & scala.Serializable)iterator -> {
            Configuration hadoopConfiguration = serConfig.get();
            FileSystemObjectCacher fsCache = new FileSystemObjectCacher(hadoopConfiguration);
            return MODULE$.DistCPIteratorImplicit((Iterator)iterator).collectMapWithEmptyCollection((Function2 & Serializable & scala.Serializable)(d, z) -> BoxesRunTime.boxToBoolean((boolean)z.exists((Function1 & Serializable & scala.Serializable)p -> BoxesRunTime.boxToBoolean((boolean)PathUtils$.MODULE$.uriIsChild(p, d)))), (Function1 & Serializable & scala.Serializable)d -> {
                DeleteResult r = CopyUtils$.MODULE$.handleDelete(fsCache.getOrCreate((URI)d), (URI)d, options);
                accumulators.handleResult(r);
                return r;
            });
        };
        boolean x$2 = qual$1.mapPartitions$default$2();
        return qual$1.mapPartitions((Function1)x$1, x$2, ClassTag$.MODULE$.apply(DistCPResult.class));
    }

    public <B> SparkDistCP.DistCPIteratorImplicit<B> DistCPIteratorImplicit(Iterator<B> iterator) {
        return new SparkDistCP.DistCPIteratorImplicit<B>(iterator);
    }

    public RDD<Tuple2<Tuple2<Object, Object>, CopyDefinitionWithDependencies>> batchAndPartitionFiles(RDD<CopyDefinitionWithDependencies> rdd, int maxFilesPerTask, long maxBytesPerTask) {
        Partitioner partitioner = (Partitioner)rdd.partitioner().getOrElse((Function0 & Serializable & scala.Serializable)() -> new HashPartitioner(rdd.partitions().length));
        RDD sorted = RDD$.MODULE$.rddToOrderedRDDFunctions(rdd.map((Function1 & Serializable & scala.Serializable)v -> new Tuple2((Object)v.source().uri().toString(), v), ClassTag$.MODULE$.apply(Tuple2.class)), (Ordering)Ordering.String$.MODULE$, ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(CopyDefinitionWithDependencies.class)).repartitionAndSortWithinPartitions(partitioner).map((Function1 & Serializable & scala.Serializable)x$7 -> (CopyDefinitionWithDependencies)x$7._2(), ClassTag$.MODULE$.apply(CopyDefinitionWithDependencies.class));
        RDD batched = sorted.mapPartitionsWithIndex(this.generateBatchedFileKeys(maxFilesPerTask, maxBytesPerTask), sorted.mapPartitionsWithIndex$default$2(), ClassTag$.MODULE$.apply(Tuple2.class));
        return RDD$.MODULE$.rddToPairRDDFunctions(batched, ClassTag$.MODULE$.apply(Tuple2.class), ClassTag$.MODULE$.apply(CopyDefinitionWithDependencies.class), Ordering$.MODULE$.Tuple2((Ordering)Ordering.Int$.MODULE$, (Ordering)Ordering.Int$.MODULE$)).partitionBy((Partitioner)CopyPartitioner$.MODULE$.apply((RDD<Tuple2<Tuple2<Object, Object>, CopyDefinitionWithDependencies>>)batched));
    }

    public Function2<Object, Iterator<CopyDefinitionWithDependencies>, Iterator<Tuple2<Tuple2<Object, Object>, CopyDefinitionWithDependencies>>> generateBatchedFileKeys(int maxFilesPerTask, long maxBytesPerTask) {
        return (Function2 & Serializable & scala.Serializable)(partition, iterator) -> SparkDistCP$.$anonfun$generateBatchedFileKeys$1(maxFilesPerTask, maxBytesPerTask, BoxesRunTime.unboxToInt((Object)partition), iterator);
    }

    public static final /* synthetic */ void $anonfun$run$4(DistCPResult x$4) {
    }

    public static final /* synthetic */ Iterator $anonfun$generateBatchedFileKeys$1(int maxFilesPerTask$1, long maxBytesPerTask$1, int partition, Iterator iterator) {
        return iterator.scanLeft((Object)new Tuple4((Object)BoxesRunTime.boxToInteger((int)0), (Object)BoxesRunTime.boxToInteger((int)0), (Object)BoxesRunTime.boxToLong((long)0L), null), (Function2 & Serializable & scala.Serializable)(x0$1, x1$1) -> {
            CopyDefinitionWithDependencies definition;
            Tuple4 tuple4;
            block3: {
                Tuple2 tuple2;
                block2: {
                    tuple2 = new Tuple2(x0$1, x1$1);
                    if (tuple2 == null) break block2;
                    tuple4 = (Tuple4)tuple2._1();
                    definition = (CopyDefinitionWithDependencies)tuple2._2();
                    if (tuple4 != null) break block3;
                }
                throw new MatchError((Object)tuple2);
            }
            int index = BoxesRunTime.unboxToInt((Object)tuple4._1());
            int count = BoxesRunTime.unboxToInt((Object)tuple4._2());
            long bytes = BoxesRunTime.unboxToLong((Object)tuple4._3());
            int newCount = count + 1;
            long newBytes = bytes + definition.source().getLen();
            Tuple4 tuple42 = newCount > maxFilesPerTask$1 || newBytes > maxBytesPerTask$1 ? new Tuple4((Object)BoxesRunTime.boxToInteger((int)(index + 1)), (Object)BoxesRunTime.boxToInteger((int)1), (Object)BoxesRunTime.boxToLong((long)definition.source().getLen()), (Object)definition) : new Tuple4((Object)BoxesRunTime.boxToInteger((int)index), (Object)BoxesRunTime.boxToInteger((int)newCount), (Object)BoxesRunTime.boxToLong((long)newBytes), (Object)definition);
            return tuple42;
        }).drop(1).map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple4 tuple4 = x0$2;
            if (tuple4 == null) {
                throw new MatchError((Object)tuple4);
            }
            int index = BoxesRunTime.unboxToInt((Object)tuple4._1());
            CopyDefinitionWithDependencies file = (CopyDefinitionWithDependencies)tuple4._4();
            Tuple2 tuple2 = new Tuple2((Object)new Tuple2.mcII.sp(partition, index), (Object)file);
            return tuple2;
        });
    }

    private SparkDistCP$() {
        MODULE$ = this;
        Logging.$init$(this);
    }
}

