/*
 * Decompiled with CFR 0.152.
 */
package ai.databand;

import ai.databand.DbndClient;
import ai.databand.DbndLogAppender;
import ai.databand.DbndRun;
import ai.databand.DefaultDbndRun;
import ai.databand.NoopDbndRun;
import ai.databand.config.DbndConfig;
import ai.databand.log.HistogramRequest;
import ai.databand.log.LogDatasetRequest;
import ai.databand.schema.ColumnStats;
import ai.databand.schema.DatabandTaskContext;
import ai.databand.schema.DatasetOperationStatus;
import ai.databand.schema.DatasetOperationType;
import ai.databand.schema.TaskRun;
import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import javassist.ClassPool;
import javassist.Loader;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.Priority;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbndWrapper {
    private static final Logger LOG = LoggerFactory.getLogger(DbndWrapper.class);
    private DbndClient dbnd;
    private final DbndConfig config = new DbndConfig();
    private final Set<String> loadedClasses;
    private final Map<String, Method> methodsCache;
    private DbndRun run;
    private boolean pipelineInitialized;
    private final Deque<String> stack;
    private static final DbndWrapper INSTANCE = new DbndWrapper();

    public static DbndWrapper instance() {
        return INSTANCE;
    }

    public DbndWrapper() {
        try {
            this.dbnd = new DbndClient(this.config);
        }
        catch (Exception e) {
            this.dbnd = null;
            LOG.error("Unable to initialize DbndClient, tracking will be disabled. Reason: {}", (Object)e.getMessage());
            this.config.setTrackingEnabled(false);
        }
        this.methodsCache = new HashMap<String, Method>(1);
        this.stack = new ArrayDeque<String>(1);
        this.loadedClasses = new HashSet<String>(1);
        String pattern = "[%d] {%c{2}} %p - %m%n";
        DbndLogAppender dbndAppender = new DbndLogAppender(this);
        dbndAppender.setLayout((Layout)new PatternLayout(pattern));
        dbndAppender.setThreshold((Priority)Level.INFO);
        dbndAppender.activateOptions();
        org.apache.log4j.Logger.getLogger((String)"org.apache.spark").addAppender((Appender)dbndAppender);
        org.apache.log4j.Logger.getLogger((String)"org.spark_project").addAppender((Appender)dbndAppender);
        org.apache.log4j.Logger.getLogger((String)"ai.databand").addAppender((Appender)dbndAppender);
    }

    public Optional<Class<?>> loadClass(String className) {
        try {
            return Optional.of(Class.forName(className));
        }
        catch (ClassNotFoundException classNotFoundException) {
            try {
                return Optional.of(new Loader(ClassPool.getDefault()).loadClass(className));
            }
            catch (ClassNotFoundException classNotFoundException2) {
                return Optional.empty();
            }
        }
    }

    public void beforePipeline(String className, String methodName, Object[] args) {
        Method method = this.findMethodByName(methodName, className);
        if (method == null) {
            this.pipelineInitialized = false;
            return;
        }
        System.out.println("Running Databand!");
        System.out.printf("TRACKER URL: %s%n", this.config.databandUrl());
        System.out.printf("CMD: %s%n", this.config.cmd());
        System.out.println("Parsed Databand properties: " + this.config);
        this.getOrCreateRun(method, args);
        this.pipelineInitialized = true;
    }

    protected Method findMethodByName(String methodName, String classname) {
        if (classname != null && !this.loadedClasses.contains(classname)) {
            this.loadMethods(classname);
        }
        String truncated = this.removeArgsFromMethodName(methodName);
        for (Map.Entry<String, Method> mthd : this.methodsCache.entrySet()) {
            if (!mthd.getKey().contains(truncated)) continue;
            return mthd.getValue();
        }
        return null;
    }

    protected String removeArgsFromMethodName(String methodName) {
        int parenIndex = methodName.indexOf("(");
        return parenIndex > 0 ? methodName.substring(0, parenIndex + 1) : methodName;
    }

    protected void loadMethods(String classname) {
        Optional<Class<?>> pipelineClass = this.loadClass(classname);
        if (!pipelineClass.isPresent()) {
            LOG.error("Unable to build method cache for class {} because it can not be loaded", (Object)classname);
            this.pipelineInitialized = false;
            return;
        }
        for (Method mthd : pipelineClass.get().getDeclaredMethods()) {
            String fullMethodName = mthd.toGenericString();
            this.methodsCache.put(fullMethodName, mthd);
        }
        this.loadedClasses.add(classname);
    }

    public void afterPipeline() {
        this.currentRun().stop();
        this.cleanup();
    }

    public void errorPipeline(Throwable error) {
        this.currentRun().error(error);
        this.cleanup();
    }

    protected void cleanup() {
        this.run = null;
        this.methodsCache.clear();
        this.pipelineInitialized = false;
        this.loadedClasses.clear();
    }

    public void beforeTask(String className, String methodName, Object[] args) {
        if (!this.pipelineInitialized) {
            if (this.stack.isEmpty()) {
                this.beforePipeline(className, methodName, args);
                this.stack.push(methodName);
            } else {
                this.beforePipeline(className, this.stack.peek(), args);
            }
            return;
        }
        DbndRun run = this.currentRun();
        Method method = this.findMethodByName(methodName, className);
        LOG.info("Running task {}", (Object)run.getTaskName(method));
        run.startTask(method, args);
        this.stack.push(methodName);
    }

    public void afterTask(String methodName, Object result) {
        this.stack.pop();
        if (this.stack.isEmpty()) {
            this.afterPipeline();
            return;
        }
        DbndRun run = this.currentRun();
        Method method = this.findMethodByName(methodName, null);
        run.completeTask(method, result);
        LOG.info("Task {} has been completed!", (Object)run.getTaskName(method));
    }

    public void errorTask(String methodName, Throwable error) {
        String poll = this.stack.pop();
        LOG.info("Task {} returned error!", (Object)poll);
        if (this.stack.isEmpty()) {
            this.errorPipeline(error);
            return;
        }
        DbndRun run = this.currentRun();
        Method method = this.findMethodByName(methodName, null);
        run.errorTask(method, error);
    }

    public void logTask(LoggingEvent event, String eventStr) {
        DbndRun run = this.currentRun();
        if (run == null) {
            return;
        }
        run.saveLog(event, eventStr);
    }

    public void logMetric(String key, Object value) {
        DbndRun run = this.currentRun();
        if (run == null) {
            run = this.createAgentlessRun();
        }
        run.logMetric(key, value);
        LOG.info("Metric logged: [{}: {}]", (Object)key, value);
    }

    public void logDatasetOperation(String path, DatasetOperationType type, DatasetOperationStatus status, Dataset<?> data, Throwable error, LogDatasetRequest params) {
        DbndRun run = this.currentRun();
        if (run == null) {
            run = this.createAgentlessRun();
        }
        run.logDatasetOperation(path, type, status, data, error, params);
        LOG.info("Dataset Operation [path: {}], [type: {}], [status: {}] logged", new Object[]{path, type, status});
    }

    public void logDatasetOperation(String path, DatasetOperationType type, DatasetOperationStatus status, String valuePreview, List<Long> dataDimensions, String dataSchema, Boolean withPartition, List<ColumnStats> columnStats) {
        DbndRun run = this.currentRun();
        if (run == null) {
            run = this.createAgentlessRun();
        }
        run.logDatasetOperation(path, type, status, valuePreview, null, dataDimensions, dataSchema, withPartition, columnStats);
        LOG.info("Dataset Operation [path: {}], [type: {}], [status: {}] logged", new Object[]{path, type, status});
    }

    public void logMetrics(Map<String, Object> metrics) {
        this.logMetrics(metrics, null);
    }

    public void logMetrics(Map<String, Object> metrics, String source) {
        DbndRun run = this.currentRun();
        if (run == null) {
            run = this.createAgentlessRun();
        }
        run.logMetrics(metrics, source);
    }

    public void logDataframe(String key, Dataset<?> value, HistogramRequest histogramRequest) {
        DbndRun run = this.currentRun();
        if (run == null) {
            run = this.createAgentlessRun();
        }
        run.logDataframe(key, value, histogramRequest);
    }

    public void logHistogram(Map<String, Object> histogram) {
        DbndRun run = this.currentRun();
        if (run == null) {
            run = this.createAgentlessRun();
        }
        run.logHistogram(histogram);
    }

    public void logDataframe(String key, Dataset<?> value, boolean withHistograms) {
        DbndRun run = this.currentRun();
        if (run == null) {
            run = this.createAgentlessRun();
        }
        run.logDataframe(key, value, new HistogramRequest(withHistograms));
        LOG.info("Dataframe {} logged", (Object)key);
    }

    public void logSpark(SparkListenerEvent event) {
        if (this.run == null) {
            this.run = this.createAgentlessRun();
        }
        if (event instanceof SparkListenerStageCompleted) {
            this.run.saveSparkMetrics((SparkListenerStageCompleted)event);
            LOG.info("Spark metrics received from SparkListener saved");
        }
    }

    public DbndConfig config() {
        return this.config;
    }

    private synchronized DbndRun getOrCreateRun(Method method, Object[] args) {
        if (this.currentRun() == null) {
            this.initRun(method, args);
        }
        return this.currentRun();
    }

    private DbndRun createAgentlessRun() {
        if (!this.config.isTrackingEnabled()) {
            return new NoopDbndRun();
        }
        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
        if (this.config.databandTaskContext().isPresent()) {
            DbndRun dbndRun = this.run = this.config.isTrackingEnabled() ? new DefaultDbndRun(this.dbnd, this.config) : new NoopDbndRun();
            if (!this.config.isTrackingEnabled()) {
                System.out.println("Tracking is not enabled. Set DBND__TRACKING variable to True if you want to enable it.");
            }
            System.out.println("Reusing existing task");
            DatabandTaskContext dbndCtx = this.config.databandTaskContext().get();
            TaskRun driverTask = new TaskRun();
            driverTask.setRunUid(dbndCtx.getRootRunUid());
            driverTask.setTaskRunUid(dbndCtx.getTaskRunUid());
            driverTask.setTaskRunAttemptUid(dbndCtx.getTaskRunAttemptUid());
            this.config.airflowContext().ifPresent(ctx -> driverTask.setName(ctx.getTaskId()));
            this.run.setDriverTask(driverTask);
        } else {
            try {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                StackTraceElement main = null;
                for (StackTraceElement el : stackTrace) {
                    if (!el.getMethodName().equals("main")) continue;
                    main = el;
                    break;
                }
                if (main == null) {
                    main = stackTrace[stackTrace.length - 1];
                }
                Class<?> entryPoint = Class.forName(main.getClassName(), true, Thread.currentThread().getContextClassLoader());
                for (Method method : entryPoint.getMethods()) {
                    if (!method.getName().contains(main.getMethodName())) continue;
                    Object[] args = new Object[method.getParameterCount()];
                    Arrays.fill(args, null);
                    this.beforePipeline(main.getClassName(), method.getName(), args);
                    break;
                }
            }
            catch (ClassNotFoundException e) {
                System.out.printf("Class not found: %s%n", e.getMessage());
            }
        }
        if (Objects.isNull(this.run)) {
            this.getOrCreateRun(null, null);
        }
        return this.run;
    }

    protected void stop() {
        if (this.run != null) {
            this.run.stop();
        }
    }

    protected DbndRun currentRun() {
        return this.run;
    }

    private void initRun(Method method, Object[] args) {
        DbndRun dbndRun = this.run = this.config.isTrackingEnabled() ? new DefaultDbndRun(this.dbnd, this.config) : new NoopDbndRun();
        if (!this.config.isTrackingEnabled()) {
            System.out.println("Tracking is not enabled. Set DBND__TRACKING variable to True if you want to enable it.");
            return;
        }
        try {
            this.run.init(method, args);
            System.out.printf("Running pipeline %s%n", this.run.getTaskName(method));
        }
        catch (Exception e) {
            this.run = new NoopDbndRun();
            System.out.println("Unable to init run:");
            e.printStackTrace();
        }
    }

    protected void printStack() {
        StringBuilder buffer = new StringBuilder(3);
        Iterator<String> iterator = this.stack.iterator();
        buffer.append('[');
        while (iterator.hasNext()) {
            buffer.append(' ');
            buffer.append(iterator.next());
            buffer.append(' ');
        }
        buffer.append(']');
        LOG.info(buffer.toString());
    }

    public void setExternalTaskContext(String runUid, String taskRunUid, String taskRunAttemptUid, String taskName) {
        if (!this.config.isTrackingEnabled()) {
            this.run = new NoopDbndRun();
            LOG.info("Attempt to set external task context failed: tracking is not enabled");
            return;
        }
        if (this.run == null) {
            this.run = new DefaultDbndRun(this.dbnd, this.config);
            Runtime.getRuntime().addShutdownHook(new Thread(this.run::stopExternal));
        }
        this.run.stopExternal();
        TaskRun task = new TaskRun();
        task.setRunUid(runUid);
        task.setTaskRunUid(taskRunUid);
        task.setTaskRunAttemptUid(taskRunAttemptUid);
        task.setName(taskName);
        this.run.setDriverTask(task);
        LOG.info("External task context was set. run_uid: {}, task_run_uid: {}, task_run_attempt_uid: {}, task_name: {}", new Object[]{runUid, taskRunUid, taskRunAttemptUid, taskName});
    }
}

