/*
 * Decompiled with CFR 0.152.
 */
package ai.databand.spark;

import ai.databand.DbndWrapper;
import ai.databand.parameters.DatasetOperationPreview;
import ai.databand.schema.ColumnStats;
import ai.databand.schema.DatasetOperationStatus;
import ai.databand.schema.DatasetOperationType;
import ai.databand.schema.Pair;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.catalyst.plans.QueryPlan;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.command.DataWritingCommandExec;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.hive.execution.HiveTableScanExec;
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.QueryExecutionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class DbndSparkQueryExecutionListener
implements QueryExecutionListener {
    private static final Logger LOG = LoggerFactory.getLogger(DbndSparkQueryExecutionListener.class);
    private final DbndWrapper dbnd;
    private final DatasetOperationPreview operationPreview;
    private final boolean isHiveEnabled;
    private final Pattern DATASET = Pattern.compile(".*\\[(.*)\\].*");

    public DbndSparkQueryExecutionListener(DbndWrapper dbnd) {
        this.dbnd = dbnd;
        this.operationPreview = new DatasetOperationPreview();
        try {
            Class.forName("org.apache.spark.sql.hive.execution.HiveTableScanExec", false, this.getClass().getClassLoader());
            Class.forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable", false, this.getClass().getClassLoader());
        }
        catch (ClassNotFoundException e) {
            this.isHiveEnabled = false;
            return;
        }
        this.isHiveEnabled = true;
    }

    public DbndSparkQueryExecutionListener() {
        this(DbndWrapper.instance());
    }

    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        if (this.dbnd.config().isVerbose()) {
            LOG.info("Processing event from function \"{}\". Executed plan: {}", (Object)funcName, (Object)qe.executedPlan());
        }
        boolean isProcessed = false;
        SparkPlan executedPlan = qe.executedPlan();
        if (this.isAdaptivePlan(executedPlan)) {
            executedPlan = this.extractFinalFromAdaptive(executedPlan).orElse(executedPlan);
        }
        if (executedPlan instanceof DataWritingCommandExec) {
            long rows;
            String path;
            InsertIntoHadoopFsRelationCommand cmd;
            DataWritingCommandExec writePlan = (DataWritingCommandExec)executedPlan;
            if (writePlan.cmd() instanceof InsertIntoHadoopFsRelationCommand) {
                cmd = (InsertIntoHadoopFsRelationCommand)writePlan.cmd();
                path = this.exctractPath(cmd.outputPath().toString());
                rows = ((SQLMetric)cmd.metrics().get((Object)"numOutputRows").get()).value();
                this.log(path, DatasetOperationType.WRITE, cmd.query().schema(), rows);
                isProcessed = true;
            }
            if (this.isHiveEnabled && writePlan.cmd() instanceof InsertIntoHiveTable) {
                try {
                    cmd = (InsertIntoHiveTable)writePlan.cmd();
                    path = cmd.table().identifier().table();
                    rows = ((SQLMetric)cmd.metrics().get((Object)"numOutputRows").get()).value();
                    this.log(path, DatasetOperationType.WRITE, cmd.query().schema(), rows);
                    isProcessed = true;
                }
                catch (Exception e) {
                    LOG.error("Unable to extract dataset information from InsertIntoHiveTable", (Throwable)e);
                }
            }
        }
        if (executedPlan instanceof WholeStageCodegenExec) {
            if (this.isDbndPlan(qe)) {
                LOG.warn("dbnd sdk Execution plan will not be reported");
                return;
            }
            List<SparkPlan> allChildren = this.getAllChildren(executedPlan);
            for (SparkPlan next : allChildren) {
                if (next instanceof FileSourceScanExec) {
                    FileSourceScanExec fileSourceScan = (FileSourceScanExec)next;
                    StructType schema = fileSourceScan.schema();
                    if (schema.isEmpty() && fileSourceScan.relation() != null) {
                        schema = fileSourceScan.relation().schema();
                    }
                    String path = this.exctractPath((String)fileSourceScan.metadata().get((Object)"Location").get());
                    long rows = ((SQLMetric)fileSourceScan.metrics().get((Object)"numOutputRows").get()).value();
                    this.log(path, DatasetOperationType.READ, schema, rows);
                    isProcessed = true;
                }
                if (!this.isHiveEnabled || !(next instanceof HiveTableScanExec)) continue;
                try {
                    HiveTableScanExec hiveTableScan = (HiveTableScanExec)next;
                    String path = ((URI)hiveTableScan.relation().tableMeta().storage().locationUri().get()).toString();
                    long rows = ((SQLMetric)hiveTableScan.metrics().get((Object)"numOutputRows").get()).value();
                    this.log(path, DatasetOperationType.READ, hiveTableScan.schema(), rows);
                    isProcessed = true;
                }
                catch (Exception e) {
                    LOG.error("Unable to extract dataset information from HiveTableScanExec", (Throwable)e);
                }
            }
        }
        if (this.dbnd.config().isVerbose() && !isProcessed) {
            LOG.info("Spark event was not processed because query execution plan is not supported");
        }
    }

    private boolean isDbndPlan(QueryExecution qe) {
        if (qe.analyzed() != null && !qe.analyzed().children().isEmpty()) {
            String dfAlias = this.getPlanVerboseString((LogicalPlan)qe.analyzed().children().apply(0));
            return dfAlias != null && dfAlias.contains("DBND_INTERNAL");
        }
        return false;
    }

    protected boolean isAdaptivePlan(Object plan) {
        return plan.getClass().getName().contains("AdaptiveSparkPlanExec");
    }

    protected String getPlanVerboseString(LogicalPlan plan) {
        try {
            return plan.verboseString();
        }
        catch (NoSuchMethodError e) {
            try {
                Class<QueryPlan> clazz = QueryPlan.class;
                Method method = clazz.getDeclaredMethod("verboseString", Integer.TYPE);
                return method.invoke((Object)plan, 1).toString();
            }
            catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException ex) {
                LOG.error("Unable to identify whenever spark query was triggered by log_dataset_op or not", (Throwable)ex);
                return null;
            }
        }
    }

    protected Optional<SparkPlan> extractFinalFromAdaptive(SparkPlan adaptivePlan) {
        try {
            Class<?> clazz = Class.forName("org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec");
            Field field = clazz.getDeclaredField("currentPhysicalPlan");
            field.setAccessible(true);
            SparkPlan value = (SparkPlan)field.get(adaptivePlan);
            return Optional.of(value);
        }
        catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException e) {
            LOG.error("Unable to extract final plan from the adaptive one using reflection. Dataset operation won't be logged.", (Throwable)e);
            return Optional.empty();
        }
    }

    protected void log(String path, DatasetOperationType operationType, StructType datasetSchema, long rows) {
        Pair<String, List<Long>> schema = this.operationPreview.extractSchema(datasetSchema, rows);
        List<ColumnStats> columnStats = this.operationPreview.extractColumnStats(datasetSchema, rows);
        this.dbnd.logDatasetOperation(path, operationType, DatasetOperationStatus.OK, "", schema.right(), schema.left(), true, columnStats, "spark_query_listener");
    }

    protected String exctractPath(String path) {
        Matcher matcher = this.DATASET.matcher(path);
        if (matcher.matches()) {
            return matcher.group(1);
        }
        return path;
    }

    protected List<SparkPlan> getAllChildren(SparkPlan root) {
        ArrayList<SparkPlan> result = new ArrayList<SparkPlan>();
        LinkedList<SparkPlan> deque = new LinkedList<SparkPlan>();
        deque.add(root);
        while (!deque.isEmpty()) {
            SparkPlan next = (SparkPlan)deque.pop();
            result.add(next);
            if (next.getClass().getName().contains("ShuffleQueryStageExec")) {
                Optional<SparkPlan> shuffleChild = this.extractChildFromShuffle(next);
                shuffleChild.ifPresent(deque::add);
                continue;
            }
            List children = (List)JavaConverters.seqAsJavaListConverter((Seq)next.children()).asJava();
            deque.addAll(children);
        }
        return result;
    }

    protected Optional<SparkPlan> extractChildFromShuffle(SparkPlan shuffleQuery) {
        try {
            Class<?> clazz = Class.forName("org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec");
            Method method = clazz.getDeclaredMethod("plan", new Class[0]);
            SparkPlan value = (SparkPlan)method.invoke((Object)shuffleQuery, new Object[0]);
            return Optional.of(value);
        }
        catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            LOG.error("Unable to extract child plan from the shuffle query using reflection. Dataset operation won't be logged.", (Throwable)e);
            return Optional.empty();
        }
    }

    public void onFailure(String funcName, QueryExecution qe, Exception exception) {
    }
}

