/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent.util;

import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ReflectionUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.shaded.org.apache.commons.lang3.reflect.FieldUtils;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.package$;
import org.apache.spark.rdd.HadoopRDD;
import org.apache.spark.rdd.MapPartitionsRDD;
import org.apache.spark.rdd.ParallelCollectionRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.execution.datasources.FileScanRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.immutable.Seq;

public class RddPathUtils {
    private static final Logger log = LoggerFactory.getLogger(RddPathUtils.class);

    public static Stream<Path> findRDDPaths(RDD rdd) {
        return Stream.of(new HadoopRDDExtractor(), new FileScanRDDExtractor(), new MapPartitionsRDDExtractor(), new ParallelCollectionRDDExtractor()).filter(e -> e.isDefinedAt(rdd)).findFirst().orElse(new UnknownRDDExtractor()).extract(rdd).filter(p -> p != null);
    }

    private static Path parentOf(String path) {
        try {
            return new Path(path).getParent();
        }
        catch (Exception e) {
            return null;
        }
    }

    static interface RddPathExtractor<T extends RDD> {
        public boolean isDefinedAt(Object var1);

        public Stream<Path> extract(T var1);
    }

    static class ParallelCollectionRDDExtractor
    implements RddPathExtractor<ParallelCollectionRDD> {
        ParallelCollectionRDDExtractor() {
        }

        @Override
        public boolean isDefinedAt(Object rdd) {
            return rdd instanceof ParallelCollectionRDD;
        }

        @Override
        public Stream<Path> extract(ParallelCollectionRDD rdd) {
            try {
                Object data = FieldUtils.readField(rdd, "data", true);
                log.debug("ParallelCollectionRDD data: {} {}", data);
                if (data instanceof Seq) {
                    return ScalaConversionUtils.fromSeq((Seq)data).stream().map(el -> {
                        Path path = null;
                        if (el instanceof Tuple2) {
                            path = RddPathUtils.parentOf(((Tuple2)el)._1.toString());
                            log.debug("Found input {}", (Object)path);
                        } else {
                            log.warn("unable to extract Path from {}", (Object)el.getClass().getCanonicalName());
                        }
                        return path;
                    }).filter(Objects::nonNull);
                }
                log.warn("Cannot extract path from ParallelCollectionRDD {}", data);
            }
            catch (IllegalAccessException | IllegalArgumentException e) {
                log.warn("Cannot read data field from ParallelCollectionRDD {}", (Object)rdd);
            }
            return Stream.empty();
        }
    }

    static class FileScanRDDExtractor
    implements RddPathExtractor<FileScanRDD> {
        FileScanRDDExtractor() {
        }

        @Override
        public boolean isDefinedAt(Object rdd) {
            return rdd instanceof FileScanRDD;
        }

        @Override
        public Stream<Path> extract(FileScanRDD rdd) {
            return ScalaConversionUtils.fromSeq(rdd.filePartitions()).stream().flatMap(fp -> Arrays.stream(fp.files())).map(f -> {
                if ("3.4".compareTo(package$.MODULE$.SPARK_VERSION()) <= 0) {
                    return ReflectionUtils.tryExecuteMethod(f, "filePath", new Object[0]).map(o -> ReflectionUtils.tryExecuteMethod(o, "toPath", new Object[0])).map(o -> (Path)o.get()).get().getParent();
                }
                return RddPathUtils.parentOf(f.filePath());
            });
        }
    }

    static class MapPartitionsRDDExtractor
    implements RddPathExtractor<MapPartitionsRDD> {
        MapPartitionsRDDExtractor() {
        }

        @Override
        public boolean isDefinedAt(Object rdd) {
            return rdd instanceof MapPartitionsRDD;
        }

        @Override
        public Stream<Path> extract(MapPartitionsRDD rdd) {
            return RddPathUtils.findRDDPaths(rdd.prev());
        }
    }

    static class HadoopRDDExtractor
    implements RddPathExtractor<HadoopRDD> {
        HadoopRDDExtractor() {
        }

        @Override
        public boolean isDefinedAt(Object rdd) {
            return rdd instanceof HadoopRDD;
        }

        @Override
        public Stream<Path> extract(HadoopRDD rdd) {
            Path[] inputPaths = FileInputFormat.getInputPaths((JobConf)rdd.getJobConf());
            Configuration hadoopConf = rdd.getConf();
            return Arrays.stream(inputPaths).map(p -> PlanUtils.getDirectoryPath(p, hadoopConf));
        }
    }

    static class UnknownRDDExtractor
    implements RddPathExtractor<RDD> {
        UnknownRDDExtractor() {
        }

        @Override
        public boolean isDefinedAt(Object rdd) {
            return true;
        }

        @Override
        public Stream<Path> extract(RDD rdd) {
            log.warn("Unknown RDD class {}", (Object)rdd);
            return Stream.empty();
        }
    }
}

