/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.openlineage.client.Environment;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageConfig;
import io.openlineage.client.circuitBreaker.CircuitBreaker;
import io.openlineage.client.circuitBreaker.CircuitBreakerFactory;
import io.openlineage.client.circuitBreaker.NoOpCircuitBreaker;
import io.openlineage.client.metrics.MicrometerProvider;
import io.openlineage.client.utils.RuntimeUtils;
import io.openlineage.spark.agent.ArgumentParser;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.JobMetricsHolder;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import io.openlineage.spark.agent.lifecycle.ExecutionContext;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.agent.util.TimeUtils;
import io.openlineage.spark.api.SparkOpenLineageConfig;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.stream.Collectors;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.package$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.Option;

public class OpenLineageSparkListener
extends SparkListener {
    private static final Logger log = LoggerFactory.getLogger(OpenLineageSparkListener.class);
    private static final Map<Long, ExecutionContext> sparkSqlExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static final Map<Integer, ExecutionContext> rddExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static WeakHashMap<RDD<?>, Configuration> outputs = new WeakHashMap();
    private static ContextFactory contextFactory;
    private static JobMetricsHolder jobMetrics;
    private final Function1<SparkSession, SparkContext> sparkContextFromSession = ScalaConversionUtils.toScalaFn(SparkSession::sparkContext);
    private final Function0<Option<SparkContext>> activeSparkContext = ScalaConversionUtils.toScalaFn(() -> ((SparkContext$)SparkContext$.MODULE$).getActive());
    private static CircuitBreaker circuitBreaker;
    private static MeterRegistry meterRegistry;
    private static String sparkVersion;
    private static final boolean isDisabled;

    public static void init(ContextFactory contextFactory) {
        OpenLineageSparkListener.contextFactory = contextFactory;
        meterRegistry = contextFactory.getMeterRegistry();
        OpenLineageSparkListener.clear();
    }

    public void onOtherEvent(SparkListenerEvent event) {
        if (isDisabled) {
            return;
        }
        this.initializeContextFactoryIfNotInitialized();
        if (event instanceof SparkListenerSQLExecutionStart) {
            OpenLineageSparkListener.sparkSQLExecStart((SparkListenerSQLExecutionStart)event);
        } else if (event instanceof SparkListenerSQLExecutionEnd) {
            OpenLineageSparkListener.sparkSQLExecEnd((SparkListenerSQLExecutionEnd)event);
        }
    }

    private static void sparkSQLExecStart(SparkListenerSQLExecutionStart startEvent) {
        OpenLineageSparkListener.getSparkSQLExecutionContext(startEvent.executionId()).ifPresent(context -> {
            meterRegistry.counter("openlineage.spark.event.sql.start", new String[0]).increment();
            circuitBreaker.run(() -> {
                context.start(startEvent);
                return null;
            });
        });
    }

    private static void sparkSQLExecEnd(SparkListenerSQLExecutionEnd endEvent) {
        ExecutionContext context = sparkSqlExecutionRegistry.remove(endEvent.executionId());
        meterRegistry.counter("openlineage.spark.event.sql.end", new String[0]).increment();
        if (context != null) {
            circuitBreaker.run(() -> {
                context.end(endEvent);
                return null;
            });
        } else {
            contextFactory.createSparkSQLExecutionContext(endEvent).ifPresent(c -> circuitBreaker.run(() -> {
                c.end(endEvent);
                return null;
            }));
        }
    }

    public void onJobStart(SparkListenerJobStart jobStart) {
        if (isDisabled) {
            return;
        }
        this.initializeContextFactoryIfNotInitialized();
        meterRegistry.counter("openlineage.spark.event.job.start", new String[0]).increment();
        Optional activeJob = ScalaConversionUtils.asJavaOptional(SparkSession.getDefaultSession().map(this.sparkContextFromSession).orElse(this.activeSparkContext)).flatMap(ctx -> Optional.ofNullable(ctx.dagScheduler()).map(ds -> ds.jobIdToActiveJob().get((Object)jobStart.jobId()))).flatMap(ScalaConversionUtils::asJavaOptional);
        Set<Integer> stages = ScalaConversionUtils.fromSeq(jobStart.stageIds()).stream().map(Integer.class::cast).collect(Collectors.toSet());
        if (sparkVersion.startsWith("3")) {
            jobMetrics.addJobStages(jobStart.jobId(), stages);
        }
        Optional.ofNullable(this.getSqlExecutionId(jobStart.properties())).map(Optional::of).orElseGet(() -> ScalaConversionUtils.asJavaOptional(SparkSession.getDefaultSession().map(this.sparkContextFromSession).orElse(this.activeSparkContext)).flatMap(ctx -> Optional.ofNullable(ctx.dagScheduler()).map(ds -> ds.jobIdToActiveJob().get((Object)jobStart.jobId())).flatMap(ScalaConversionUtils::asJavaOptional)).map(job -> this.getSqlExecutionId(job.properties()))).map(Long::parseLong).map(id -> OpenLineageSparkListener.getExecutionContext(jobStart.jobId(), id)).orElseGet(() -> OpenLineageSparkListener.getExecutionContext(jobStart.jobId())).ifPresent(context -> {
            activeJob.ifPresent(context::setActiveJob);
            circuitBreaker.run(() -> {
                context.start(jobStart);
                return null;
            });
        });
    }

    private String getSqlExecutionId(Properties properties) {
        return properties.getProperty("spark.sql.execution.id");
    }

    public void onJobEnd(SparkListenerJobEnd jobEnd) {
        if (isDisabled) {
            return;
        }
        ExecutionContext context = rddExecutionRegistry.remove(jobEnd.jobId());
        meterRegistry.counter("openlineage.spark.event.job.end", new String[0]).increment();
        circuitBreaker.run(() -> {
            if (context != null) {
                context.end(jobEnd);
            }
            return null;
        });
        if (sparkVersion.startsWith("3")) {
            jobMetrics.cleanUp(jobEnd.jobId());
        }
    }

    public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
        if (isDisabled || sparkVersion.startsWith("2")) {
            return;
        }
        jobMetrics.addMetrics(taskEnd.stageId(), taskEnd.taskMetrics());
    }

    public static Optional<ExecutionContext> getSparkSQLExecutionContext(long executionId) {
        return Optional.ofNullable(sparkSqlExecutionRegistry.computeIfAbsent(executionId, e -> contextFactory.createSparkSQLExecutionContext(executionId).orElse(null)));
    }

    public static Optional<ExecutionContext> getExecutionContext(int jobId) {
        return Optional.ofNullable(rddExecutionRegistry.computeIfAbsent(jobId, e -> contextFactory.createRddExecutionContext(jobId)));
    }

    public static Optional<ExecutionContext> getExecutionContext(int jobId, long executionId) {
        Optional<ExecutionContext> executionContext = OpenLineageSparkListener.getSparkSQLExecutionContext(executionId);
        executionContext.ifPresent(context -> rddExecutionRegistry.put(jobId, (ExecutionContext)context));
        return executionContext;
    }

    public static Configuration getConfigForRDD(RDD<?> rdd) {
        return outputs.get(rdd);
    }

    private static OpenLineage.RunFacets errorRunFacet(Exception e, OpenLineage ol) {
        OpenLineage.RunFacet errorFacet = ol.newRunFacet();
        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
        e.printStackTrace(new PrintWriter((OutputStream)buffer, true));
        errorFacet.getAdditionalProperties().put("exception", buffer.toString());
        OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder();
        runFacetsBuilder.put("lineage.error", errorFacet);
        return runFacetsBuilder.build();
    }

    private static void clear() {
        sparkSqlExecutionRegistry.clear();
        rddExecutionRegistry.clear();
        outputs.clear();
    }

    public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
        meterRegistry.counter("openlineage.spark.event.app.end", new String[0]).increment();
        meterRegistry.counter("openlineage.spark.event.app.end.memoryusage", new String[0]).increment(RuntimeUtils.getMemoryFractionUsage());
        circuitBreaker.run(() -> {
            this.emitApplicationEndEvent(applicationEnd.time());
            return null;
        });
        OpenLineageSparkListener.close();
        super.onApplicationEnd(applicationEnd);
    }

    public static void close() {
        OpenLineageSparkListener.clear();
    }

    public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
        this.initializeContextFactoryIfNotInitialized(applicationStart.appName());
        meterRegistry.counter("openlineage.spark.event.app.start", new String[0]).increment();
        meterRegistry.counter("openlineage.spark.event.app.start.memoryusage", new String[0]).increment(RuntimeUtils.getMemoryFractionUsage());
        circuitBreaker.run(() -> {
            this.emitApplicationStartEvent(applicationStart.time());
            return null;
        });
    }

    private void initializeContextFactoryIfNotInitialized() {
        if (contextFactory != null || isDisabled) {
            return;
        }
        ScalaConversionUtils.asJavaOptional((Option)this.activeSparkContext.apply()).ifPresent(context -> this.initializeContextFactoryIfNotInitialized(context.appName()));
    }

    private void initializeContextFactoryIfNotInitialized(String appName) {
        if (contextFactory != null || isDisabled) {
            return;
        }
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        if (sparkEnv == null) {
            log.warn("OpenLineage listener instantiated, but no configuration could be found. Lineage events will not be collected");
            return;
        }
        this.initializeContextFactoryIfNotInitialized(sparkEnv.conf(), appName);
    }

    private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, String appName) {
        if (contextFactory != null || isDisabled) {
            return;
        }
        try {
            SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf);
            OpenLineageSparkListener.initializeMetrics(config);
            contextFactory = new ContextFactory(new EventEmitter(config, appName), meterRegistry, config);
            circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build();
        }
        catch (URISyntaxException e) {
            log.error("Unable to parse OpenLineage endpoint. Lineage events will not be collected", (Throwable)e);
        }
    }

    private static void initializeMetrics(OpenLineageConfig openLineageConfig) {
        meterRegistry = MicrometerProvider.addMeterRegistryFromConfig(openLineageConfig.getMetricsConfig());
        String disabledFacets = openLineageConfig.getFacetsConfig() != null && openLineageConfig.getFacetsConfig().getDisabledFacets() != null ? String.join((CharSequence)";", openLineageConfig.getFacetsConfig().getDisabledFacets()) : "";
        meterRegistry.config().commonTags(Tags.of(Tag.of("openlineage.spark.integration.version", Versions.getVersion()), Tag.of("openlineage.spark.version", sparkVersion), Tag.of("openlineage.spark.disabled.facets", disabledFacets)));
        ((CompositeMeterRegistry)meterRegistry).getRegistries().forEach(r -> r.config().commonTags(Tags.of(Tag.of("openlineage.spark.integration.version", Versions.getVersion()), Tag.of("openlineage.spark.version", sparkVersion), Tag.of("openlineage.spark.disabled.facets", disabledFacets))));
    }

    private void emitApplicationEvent(Long time, OpenLineage.RunEvent.EventType eventType) {
        OpenLineage openLineage = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
        EventEmitter emitter = OpenLineageSparkListener.contextFactory.openLineageEventEmitter;
        OpenLineage.RunBuilder runBuilder = openLineage.newRunBuilder().runId(emitter.getApplicationRunId());
        if (emitter.getParentRunId().isPresent() && emitter.getParentJobName().isPresent() && emitter.getParentJobNamespace().isPresent()) {
            runBuilder.facets(openLineage.newRunFacetsBuilder().parent(openLineage.newParentRunFacet(openLineage.newParentRunFacetRun(emitter.getParentRunId().get()), openLineage.newParentRunFacetJob(emitter.getParentJobNamespace().get(), emitter.getParentJobName().get()))).build());
        }
        OpenLineage.RunEvent applicationEvent = openLineage.newRunEventBuilder().eventType(eventType).eventTime(TimeUtils.toZonedTime(time)).job(openLineage.newJobBuilder().namespace(emitter.getJobNamespace()).name(emitter.getApplicationJobName().replaceAll("[\\s\\-_]?((?<=.)[A-Z](?=[a-z\\s\\-_])|(?<=[^A-Z])[A-Z]|((?<=[\\s\\-_])[a-z\\d]))", "_$1").toLowerCase(Locale.ROOT)).build()).run(runBuilder.build()).inputs(Collections.emptyList()).outputs(Collections.emptyList()).build();
        emitter.emit(applicationEvent);
    }

    private void emitApplicationStartEvent(Long time) {
        this.emitApplicationEvent(time, OpenLineage.RunEvent.EventType.START);
    }

    private void emitApplicationEndEvent(Long time) {
        this.emitApplicationEvent(time, OpenLineage.RunEvent.EventType.COMPLETE);
    }

    private static boolean checkIfDisabled() {
        String isDisabled = Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED");
        return Boolean.parseBoolean(isDisabled);
    }

    static {
        jobMetrics = JobMetricsHolder.getInstance();
        circuitBreaker = new NoOpCircuitBreaker();
        sparkVersion = package$.MODULE$.SPARK_VERSION();
        isDisabled = OpenLineageSparkListener.checkIfDisabled();
    }
}

