/*
 * Decompiled with CFR 0.152.
 */
package ai.databand.spark;

import ai.databand.DbndAppLog;
import ai.databand.DbndWrapper;
import ai.databand.parameters.DatasetOperationPreview;
import ai.databand.schema.ColumnStats;
import ai.databand.schema.DatasetOperationStatus;
import ai.databand.schema.DatasetOperationType;
import ai.databand.schema.Pair;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.catalyst.plans.QueryPlan;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.command.DataWritingCommandExec;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.hive.execution.HiveTableScanExec;
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.QueryExecutionListener;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class DbndSparkQueryExecutionListener
implements QueryExecutionListener {
    private static final DbndAppLog LOG = new DbndAppLog(LoggerFactory.getLogger(DbndSparkQueryExecutionListener.class));
    private final DbndWrapper dbnd;
    private final DatasetOperationPreview operationPreview;
    private final boolean isHiveEnabled;
    private final Pattern DATASET = Pattern.compile(".*\\[(.*)\\].*");

    public DbndSparkQueryExecutionListener(DbndWrapper dbnd) {
        this.dbnd = dbnd;
        this.operationPreview = new DatasetOperationPreview();
        try {
            Class.forName("org.apache.spark.sql.hive.execution.HiveTableScanExec", false, this.getClass().getClassLoader());
            Class.forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable", false, this.getClass().getClassLoader());
        }
        catch (ClassNotFoundException e) {
            this.isHiveEnabled = false;
            return;
        }
        this.isHiveEnabled = true;
        LOG.jvmInfo("Succesfully constructed Databand QueryExecutionListener instance. Selected Spark events with dataset operations will be submitted to the Databand service.", new Object[0]);
    }

    public DbndSparkQueryExecutionListener() {
        this(DbndWrapper.instance());
    }

    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        boolean isProcessed = false;
        SparkPlan executedPlan = qe.executedPlan();
        LOG.verbose("[{}] Processing event from function \"{}()\" and execution plan class: {}. Executed plan: {}", executedPlan.hashCode(), funcName, executedPlan.getClass().getName(), executedPlan);
        if (this.isAdaptivePlan(executedPlan)) {
            executedPlan = this.extractFinalFromAdaptive(executedPlan).orElse(executedPlan);
            LOG.verbose("[{}] Extracted final plan from adaptive plan. Final executed plan: {}", executedPlan.hashCode(), executedPlan);
        }
        if (executedPlan instanceof DataWritingCommandExec) {
            long rows;
            String path;
            InsertIntoHadoopFsRelationCommand cmd;
            LOG.verbose("[{}] ExecutedPlan is instanceof DataWritingCommandExec", executedPlan.hashCode());
            isProcessed = this.submitReadOps(executedPlan);
            DataWritingCommandExec writePlan = (DataWritingCommandExec)executedPlan;
            if (writePlan.cmd() instanceof InsertIntoHadoopFsRelationCommand) {
                LOG.verbose("[{}] write is combined with reads", writePlan.hashCode());
                cmd = (InsertIntoHadoopFsRelationCommand)writePlan.cmd();
                path = this.extractPath(cmd.outputPath().toString());
                rows = ((SQLMetric)cmd.metrics().get((Object)"numOutputRows").get()).value();
                this.log(path, DatasetOperationType.WRITE, cmd.query().schema(), rows, false);
                isProcessed = true;
            }
            if (this.isHiveEnabled && writePlan.cmd() instanceof InsertIntoHiveTable) {
                LOG.verbose("[{}] ExecutePlan is instanceof InsertIntoHiveTable", writePlan.hashCode());
                try {
                    cmd = (InsertIntoHiveTable)writePlan.cmd();
                    path = cmd.table().identifier().table();
                    rows = ((SQLMetric)cmd.metrics().get((Object)"numOutputRows").get()).value();
                    this.log(path, DatasetOperationType.WRITE, cmd.query().schema(), rows, true);
                    isProcessed = true;
                }
                catch (Exception e) {
                    LOG.error("[{}] Unable to extract dataset information from InsertIntoHiveTable - {}", writePlan.hashCode(), e);
                }
            }
        }
        if (executedPlan instanceof WholeStageCodegenExec) {
            LOG.verbose("[{}] ExecutePlan is instanceof WholeStageCodegenExec", executedPlan.hashCode());
            if (this.isDbndPlan(qe)) {
                LOG.warn("[{}] Explicit Databand SDK DataFrame tracking will not be reported by JVM", executedPlan.hashCode());
                return;
            }
            isProcessed = this.submitReadOps(executedPlan);
        }
        if (!isProcessed) {
            LOG.verbose("[{}] Spark event was not processed because execution plan class {} and all its child plans are not supported", executedPlan.hashCode(), executedPlan.getClass().getName());
        } else {
            LOG.verbose("[{}] Spark event was processed succesfully, execution plan class: {}", executedPlan.hashCode(), executedPlan.getClass().getName());
        }
    }

    protected boolean submitReadOps(SparkPlan executedPlan) {
        boolean isProcessed = false;
        List<SparkPlan> allChildren = this.getAllChildren(executedPlan);
        LOG.verbose("[{}] {} children plans detected.", executedPlan.hashCode(), allChildren.size());
        for (SparkPlan next : allChildren) {
            if (next instanceof FileSourceScanExec) {
                LOG.verbose("[{}][{}] Supported FileSourceScanExec child plan type detected", executedPlan.hashCode(), next.hashCode());
                FileSourceScanExec fileSourceScan = (FileSourceScanExec)next;
                StructType schema = fileSourceScan.schema();
                if (schema.isEmpty() && fileSourceScan.relation() != null) {
                    schema = fileSourceScan.relation().schema();
                }
                String path = this.extractPath((String)fileSourceScan.metadata().get((Object)"Location").get());
                long rows = ((SQLMetric)fileSourceScan.metrics().get((Object)"numOutputRows").get()).value();
                this.log(path, DatasetOperationType.READ, schema, rows, false);
                isProcessed = true;
                continue;
            }
            if (this.isHiveEnabled && next instanceof HiveTableScanExec) {
                LOG.verbose("[{}][{}] Supported HiveTableScanExec child plan type detected", executedPlan.hashCode(), next.hashCode());
                try {
                    HiveTableScanExec hiveTableScan = (HiveTableScanExec)next;
                    String path = ((URI)hiveTableScan.relation().tableMeta().storage().locationUri().get()).toString();
                    long rows = ((SQLMetric)hiveTableScan.metrics().get((Object)"numOutputRows").get()).value();
                    this.log(path, DatasetOperationType.READ, hiveTableScan.relation().schema(), rows, true);
                    isProcessed = true;
                }
                catch (Exception e) {
                    LOG.error("[{}][{}] Unable to extract dataset information from HiveTableScanExec - {}", executedPlan.hashCode(), next.hashCode(), e);
                }
                continue;
            }
            LOG.verbose("[{}][{}] Unsupported children plan: {}", executedPlan.hashCode(), next.hashCode(), next.getClass().getName());
        }
        return isProcessed;
    }

    private boolean isDbndPlan(QueryExecution qe) {
        if (qe.analyzed() != null && !qe.analyzed().children().isEmpty()) {
            String dfAlias = this.getPlanVerboseString((LogicalPlan)qe.analyzed().children().apply(0));
            return dfAlias != null && dfAlias.contains("DBND_INTERNAL");
        }
        return false;
    }

    protected boolean isAdaptivePlan(Object plan) {
        return plan.getClass().getName().contains("AdaptiveSparkPlanExec");
    }

    protected String getPlanVerboseString(LogicalPlan plan) {
        try {
            return plan.verboseString();
        }
        catch (NoSuchMethodError e) {
            try {
                Class<QueryPlan> clazz = QueryPlan.class;
                Method method = clazz.getDeclaredMethod("verboseString", Integer.TYPE);
                return method.invoke((Object)plan, 1).toString();
            }
            catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException ex) {
                LOG.error("Unable to identify whenever spark query was triggered by log_dataset_op or not", ex);
                return null;
            }
        }
    }

    protected Optional<SparkPlan> extractFinalFromAdaptive(SparkPlan adaptivePlan) {
        try {
            Class<?> clazz = Class.forName("org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec");
            Field field = clazz.getDeclaredField("currentPhysicalPlan");
            field.setAccessible(true);
            SparkPlan value = (SparkPlan)field.get(adaptivePlan);
            return Optional.of(value);
        }
        catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException e) {
            LOG.error("[{}] Unable to extract final plan from the adaptive one using reflection. Dataset operation won't be logged - {}.", adaptivePlan.hashCode(), e);
            return Optional.empty();
        }
    }

    protected void log(String path, DatasetOperationType operationType, StructType datasetSchema, long rows, boolean isPartitioned) {
        Pair<String, List<Long>> schema = this.operationPreview.extractSchema(datasetSchema, rows);
        List<ColumnStats> columnStats = this.operationPreview.extractColumnStats(datasetSchema, rows);
        this.dbnd.logDatasetOperation(path, operationType, DatasetOperationStatus.OK, "", schema.right(), schema.left(), isPartitioned, columnStats, "spark_query_listener");
    }

    protected String extractPath(String path) {
        Matcher matcher = this.DATASET.matcher(path);
        if (matcher.matches()) {
            return matcher.group(1);
        }
        return path;
    }

    protected List<SparkPlan> getAllChildren(SparkPlan root) {
        ArrayList<SparkPlan> result = new ArrayList<SparkPlan>();
        LinkedList<SparkPlan> deque = new LinkedList<SparkPlan>();
        deque.add(root);
        List<String> spark3classes = Arrays.asList("org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec", "org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec");
        while (!deque.isEmpty()) {
            SparkPlan next = (SparkPlan)deque.pop();
            result.add(next);
            if (spark3classes.contains(next.getClass().getName())) {
                Optional<SparkPlan> spark3Child = this.extractChildFromSpark3(next);
                spark3Child.ifPresent(deque::add);
                if (spark3Child.isPresent()) continue;
                LOG.verbose("[{}][{}] No child node for Spark3 node class '{}'", root.hashCode(), next.hashCode(), next.getClass().getName());
                continue;
            }
            if (next.getClass().getName().contains("AdaptiveSparkPlanExec")) {
                Optional<SparkPlan> finalPlanFromAdaptive = this.extractFinalFromAdaptive(next);
                finalPlanFromAdaptive.ifPresent(deque::add);
                continue;
            }
            List children = (List)JavaConverters.seqAsJavaListConverter((Seq)next.children()).asJava();
            deque.addAll(children);
            if (children.size() != 0) continue;
            LOG.verbose("[{}][{}] No children nodes for node '{}'", root.hashCode(), next.hashCode(), next.getClass().getName());
        }
        return result;
    }

    protected Optional<SparkPlan> extractChildFromSpark3(SparkPlan spark3Query) {
        try {
            Class<?> clazz = Class.forName(spark3Query.getClass().getName());
            Method method = clazz.getDeclaredMethod("plan", new Class[0]);
            SparkPlan value = (SparkPlan)method.invoke((Object)spark3Query, new Object[0]);
            return Optional.of(value);
        }
        catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            LOG.error("[{}] Unable to extract child plan from the spark3 query '{}' using reflection. Dataset operation won't be logged - {}.", spark3Query.hashCode(), spark3Query.getClass().getName(), e);
            return Optional.empty();
        }
    }

    public void onFailure(String funcName, QueryExecution qe, Exception exception) {
    }
}

