/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.facets.ErrorFacet;
import io.openlineage.spark.agent.facets.SparkVersionFacet;
import io.openlineage.spark.agent.facets.builder.SparkProcessingEngineRunFacetBuilderDelegate;
import io.openlineage.spark.agent.facets.builder.SparkPropertyFacetBuilder;
import io.openlineage.spark.agent.lifecycle.ExecutionContext;
import io.openlineage.spark.agent.lifecycle.Rdds;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.agent.util.TimeUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.parquet.Strings;
import org.apache.spark.Dependency;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil;
import org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil;
import org.apache.spark.rdd.HadoopRDD;
import org.apache.spark.rdd.MapPartitionsRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.JobResult;
import org.apache.spark.scheduler.ResultStage;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.apache.spark.util.SerializableJobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.immutable.Seq;

class RddExecutionContext
implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger(RddExecutionContext.class);
    private final EventEmitter eventEmitter;
    private final Optional<SparkContext> sparkContextOption;
    private final UUID runId = UUID.randomUUID();
    private List<URI> inputs = Collections.emptyList();
    private List<URI> outputs = Collections.emptyList();
    private String jobSuffix;

    public RddExecutionContext(EventEmitter eventEmitter) {
        this.eventEmitter = eventEmitter;
        Option activeSessionOption = SparkContext$.MODULE$.getActive();
        this.sparkContextOption = activeSessionOption.isDefined() ? Optional.of(activeSessionOption.get()) : Optional.empty();
    }

    @Override
    public void start(SparkListenerStageSubmitted stageSubmitted) {
    }

    @Override
    public void end(SparkListenerStageCompleted stageCompleted) {
    }

    @Override
    public void setActiveJob(ActiveJob activeJob) {
        log.debug("setActiveJob within RddExecutionContext {}", (Object)activeJob);
        RDD finalRDD = activeJob.finalStage().rdd();
        this.jobSuffix = RddExecutionContext.nameRDD(finalRDD);
        Set<RDD<?>> rdds = Rdds.flattenRDDs(finalRDD);
        log.debug("flattenRDDs {}", rdds);
        this.inputs = this.findInputs(rdds);
        JobConf jc = new JobConf();
        if (activeJob.finalStage() instanceof ResultStage) {
            ResultStage resultStage = (ResultStage)activeJob.finalStage();
            try {
                Field f = this.getConfigField(resultStage);
                f.setAccessible(true);
                Object conf = f.get(resultStage.func());
                if (conf instanceof HadoopMapRedWriteConfigUtil) {
                    Field confField = HadoopMapRedWriteConfigUtil.class.getDeclaredField("conf");
                    confField.setAccessible(true);
                    SerializableJobConf serializableJobConf = (SerializableJobConf)confField.get(conf);
                    jc = serializableJobConf.value();
                } else if (conf instanceof HadoopMapReduceWriteConfigUtil) {
                    Field confField = HadoopMapReduceWriteConfigUtil.class.getDeclaredField("conf");
                    confField.setAccessible(true);
                    SerializableJobConf serializableJobConf = (SerializableJobConf)confField.get(conf);
                    jc = serializableJobConf.value();
                } else {
                    log.info("Config field is not HadoopMapRedWriteConfigUtil or HadoopMapReduceWriteConfigUtil, it's {}", (Object)conf.getClass().getCanonicalName());
                }
            }
            catch (IllegalAccessException | NoSuchFieldException nfe) {
                log.warn("Unable to access job conf from RDD", (Throwable)nfe);
            }
            log.info("Found job conf from RDD {}", (Object)jc);
        } else {
            jc = OpenLineageSparkListener.getConfigForRDD(finalRDD);
        }
        this.outputs = this.findOutputs(finalRDD, (Configuration)jc);
    }

    private Field getConfigField(ResultStage resultStage) throws NoSuchFieldException {
        try {
            return resultStage.func().getClass().getDeclaredField("config$1");
        }
        catch (NoSuchFieldException e) {
            return resultStage.func().getClass().getDeclaredField("arg$1");
        }
    }

    static String nameRDD(RDD<?> rdd) {
        Seq deps;
        List<Dependency> dependencies;
        String rddName = rdd.name();
        if (rddName == null || rdd instanceof HadoopRDD && Arrays.stream(FileInputFormat.getInputPaths((JobConf)((HadoopRDD)rdd).getJobConf())).anyMatch(p -> p.toString().contains(rdd.name())) || rdd instanceof MapPartitionsRDD && rdd.name().equals(((MapPartitionsRDD)rdd).prev().name())) {
            rddName = rdd.getClass().getSimpleName().replaceAll("RDD\\d*$", "").replaceAll("[\\s\\-_]?((?<=.)[A-Z](?=[a-z\\s\\-_])|(?<=[^A-Z])[A-Z]|((?<=[\\s\\-_])[a-z\\d]))", "_$1").toLowerCase(Locale.ROOT);
        }
        if ((dependencies = ScalaConversionUtils.fromSeq(deps = rdd.dependencies())).isEmpty()) {
            return rddName;
        }
        ArrayList<String> dependencyNames = new ArrayList<String>();
        for (Dependency d : dependencies) {
            dependencyNames.add(RddExecutionContext.nameRDD(d.rdd()));
        }
        String dependencyName = Strings.join(dependencyNames, (String)"_");
        if (!dependencyName.startsWith(rddName)) {
            return rddName + "_" + dependencyName;
        }
        return dependencyName;
    }

    @Override
    public void start(SparkListenerSQLExecutionStart sqlStart) {
        log.debug("start SparkListenerSQLExecutionStart {}", (Object)sqlStart);
    }

    @Override
    public void end(SparkListenerSQLExecutionEnd sqlEnd) {
        log.debug("start SparkListenerSQLExecutionEnd {}", (Object)sqlEnd);
    }

    @Override
    public void start(SparkListenerJobStart jobStart) {
        log.debug("start SparkListenerJobStart {}", (Object)jobStart);
        if (this.outputs.isEmpty()) {
            log.info("Output RDDs are empty: skipping sending OpenLineage event");
            return;
        }
        OpenLineage ol = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
        OpenLineage.RunEvent event = ol.newRunEventBuilder().eventTime(TimeUtils.toZonedTime(jobStart.time())).eventType(OpenLineage.RunEvent.EventType.START).inputs(this.buildInputs(this.inputs)).outputs(this.buildOutputs(this.outputs)).run(ol.newRunBuilder().runId(this.runId).facets(this.buildRunFacets(null, ol, (SparkListenerEvent)jobStart)).build()).job(this.buildJob(jobStart.jobId())).build();
        log.debug("Posting event for start {}: {}", (Object)jobStart, (Object)event);
        this.eventEmitter.emit(event);
    }

    @Override
    public void end(SparkListenerJobEnd jobEnd) {
        log.debug("end SparkListenerJobEnd {}", (Object)jobEnd);
        if (this.outputs.isEmpty() && !(jobEnd.jobResult() instanceof JobFailed)) {
            log.info("Output RDDs are empty: skipping sending OpenLineage event");
            return;
        }
        OpenLineage ol = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
        OpenLineage.RunEvent event = ol.newRunEventBuilder().eventTime(TimeUtils.toZonedTime(jobEnd.time())).eventType(this.getEventType(jobEnd.jobResult())).inputs(this.buildInputs(this.inputs)).outputs(this.buildOutputs(this.outputs)).run(ol.newRunBuilder().runId(this.runId).facets(this.buildRunFacets(this.buildJobErrorFacet(jobEnd.jobResult()), ol, (SparkListenerEvent)jobEnd)).build()).job(this.buildJob(jobEnd.jobId())).build();
        log.debug("Posting event for end {}: {}", (Object)jobEnd, (Object)event);
        this.eventEmitter.emit(event);
    }

    protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, OpenLineage ol, SparkListenerEvent event) {
        OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder();
        runFacetsBuilder.parent(this.buildApplicationParentFacet());
        if (jobError != null) {
            runFacetsBuilder.put("spark.exception", jobError);
        }
        this.addSparkVersionFacet(runFacetsBuilder);
        this.addProcessingEventFacet(runFacetsBuilder, ol);
        this.addSparkPropertyFacet(runFacetsBuilder, event);
        return runFacetsBuilder.build();
    }

    private void addSparkVersionFacet(OpenLineage.RunFacetsBuilder b0) {
        this.sparkContextOption.ifPresent(context -> b0.put("spark_version", new SparkVersionFacet((SparkContext)context)));
    }

    private void addProcessingEventFacet(OpenLineage.RunFacetsBuilder b0, OpenLineage ol) {
        this.sparkContextOption.ifPresent(context -> {
            OpenLineage.ProcessingEngineRunFacet facet = new SparkProcessingEngineRunFacetBuilderDelegate(ol, (SparkContext)context).buildFacet();
            b0.processing_engine(facet);
        });
    }

    private void addSparkPropertyFacet(OpenLineage.RunFacetsBuilder b0, SparkListenerEvent event) {
        b0.put("spark_properties", new SparkPropertyFacetBuilder().buildFacet(event));
    }

    private OpenLineage.ParentRunFacet buildApplicationParentFacet() {
        return PlanUtils.parentRunFacet(this.eventEmitter.getApplicationRunId(), this.eventEmitter.getApplicationJobName(), this.eventEmitter.getJobNamespace());
    }

    protected ErrorFacet buildJobErrorFacet(JobResult jobResult) {
        if (jobResult instanceof JobFailed && ((JobFailed)jobResult).exception() != null) {
            return ErrorFacet.builder().exception(((JobFailed)jobResult).exception()).build();
        }
        return null;
    }

    protected OpenLineage.Job buildJob(int jobId) {
        String suffix = this.jobSuffix;
        if (this.jobSuffix == null) {
            suffix = String.valueOf(jobId);
        }
        String name = this.eventEmitter.getOverriddenAppName().orElse(this.sparkContextOption.map(SparkContext::appName).orElse("unknown"));
        String jobName = name + "." + suffix;
        return new OpenLineage.JobBuilder().namespace(this.eventEmitter.getJobNamespace()).name(jobName.replaceAll("[\\s\\-_]?((?<=.)[A-Z](?=[a-z\\s\\-_])|(?<=[^A-Z])[A-Z]|((?<=[\\s\\-_])[a-z\\d]))", "_$1").toLowerCase(Locale.ROOT)).build();
    }

    protected List<OpenLineage.OutputDataset> buildOutputs(List<URI> outputs) {
        return outputs.stream().map(this::buildOutputDataset).collect(Collectors.toList());
    }

    protected OpenLineage.InputDataset buildInputDataset(URI uri) {
        DatasetIdentifier di = PathUtils.fromURI(uri);
        return new OpenLineage.InputDatasetBuilder().name(di.getName()).namespace(di.getNamespace()).build();
    }

    protected OpenLineage.OutputDataset buildOutputDataset(URI uri) {
        DatasetIdentifier di = PathUtils.fromURI(uri);
        return new OpenLineage.OutputDatasetBuilder().name(di.getName()).namespace(di.getNamespace()).build();
    }

    protected List<OpenLineage.InputDataset> buildInputs(List<URI> inputs) {
        return inputs.stream().map(this::buildInputDataset).collect(Collectors.toList());
    }

    protected List<URI> findOutputs(RDD<?> rdd, Configuration config) {
        Path outputPath = RddExecutionContext.getOutputPath(rdd, config);
        log.info("Found output path {} from RDD {}", (Object)outputPath, rdd);
        if (outputPath != null) {
            return Collections.singletonList(outputPath.toUri());
        }
        log.debug("Output path is null");
        return Collections.emptyList();
    }

    protected List<URI> findInputs(Set<RDD<?>> rdds) {
        log.debug("find Inputs within RddExecutionContext {}", rdds);
        return PlanUtils.findRDDPaths(rdds.stream().collect(Collectors.toList())).stream().map(path -> path.toUri()).collect(Collectors.toList());
    }

    protected void printRDDs(String prefix, RDD<?> rdd) {
        List<Dependency> deps = ScalaConversionUtils.fromSeq(rdd.dependencies());
        for (Dependency dep : deps) {
            this.printRDDs(prefix + "  ", dep.rdd());
        }
    }

    protected static Path getOutputPath(RDD<?> rdd, Configuration config) {
        Path path = null;
        if (config != null) {
            JobConf jc = config instanceof JobConf ? (JobConf)config : new JobConf(config);
            log.debug("JobConf {}", (Object)jc);
            path = org.apache.hadoop.mapred.FileOutputFormat.getOutputPath((JobConf)jc);
            if (path == null) {
                try {
                    log.debug("Path is null, trying to use old fashioned mapreduce api");
                    path = FileOutputFormat.getOutputPath((JobContext)new Job((Configuration)jc));
                }
                catch (IOException exception) {
                    exception.printStackTrace(System.out);
                }
            }
        }
        if (path == null) {
            path = PlanUtils.findRDDPaths(Collections.singletonList(rdd)).stream().findFirst().orElse(null);
        }
        return path;
    }

    protected OpenLineage.RunEvent.EventType getEventType(JobResult jobResult) {
        if (jobResult.getClass().getSimpleName().startsWith("JobSucceeded")) {
            return OpenLineage.RunEvent.EventType.COMPLETE;
        }
        return OpenLineage.RunEvent.EventType.FAIL;
    }
}

