/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.engine;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import org.apache.apex.api.EmbeddedAppLauncher;
import org.apache.apex.api.Launcher;
import org.apache.hadoop.conf.Configuration;

public class EmbeddedAppLauncherImpl
extends LocalMode<EmbeddedAppHandleImpl> {
    private final LogicalPlan lp = new LogicalPlan();

    public DAG getDAG() {
        return this.lp;
    }

    public DAG cloneDAG() throws Exception {
        return StramLocalCluster.cloneLogicalPlan(this.lp);
    }

    public EmbeddedAppHandleImpl launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap launchParameters) throws Launcher.LauncherException {
        try {
            this.prepareDAG(application, configuration);
        }
        catch (Exception e) {
            throw new Launcher.LauncherException((Throwable)e);
        }
        StramLocalCluster lc = this.getController();
        boolean launched = false;
        if (launchParameters != null) {
            if (((Boolean)StramUtils.getValueWithDefault(launchParameters, SERIALIZE_DAG)).booleanValue()) {
                try {
                    this.cloneDAG();
                }
                catch (Exception e) {
                    throw new Launcher.LauncherException((Throwable)e);
                }
            }
            if (((Boolean)StramUtils.getValueWithDefault(launchParameters, HEARTBEAT_MONITORING)).booleanValue()) {
                lc.setHeartbeatMonitoringEnabled(true);
            }
            if (((Boolean)StramUtils.getValueWithDefault(launchParameters, RUN_ASYNC)).booleanValue()) {
                lc.runAsync();
                launched = true;
            } else {
                Long runMillis = (Long)StramUtils.getValueWithDefault(launchParameters, RUN_MILLIS);
                if (runMillis != null) {
                    lc.run(runMillis);
                    launched = true;
                }
            }
        }
        if (!launched) {
            lc.run();
        }
        return new EmbeddedAppHandleImpl(lc);
    }

    public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception {
        if (app == null && conf == null) {
            throw new IllegalArgumentException("Require app or configuration to populate logical plan.");
        }
        if (conf == null) {
            conf = new Configuration(false);
        }
        LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
        String appName = app != null ? app.getClass().getName() : "unknown";
        lpc.prepareDAG(this.lp, app, appName);
        return this.lp;
    }

    public StramLocalCluster getController() {
        try {
            this.addLibraryJarsToClasspath(this.lp);
            return new StramLocalCluster(this.lp);
        }
        catch (Exception e) {
            throw new RuntimeException("Error creating local cluster", e);
        }
    }

    private void addLibraryJarsToClasspath(LogicalPlan lp) throws MalformedURLException {
        String[] split;
        String libJarsCsv = (String)lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
        if (libJarsCsv != null && libJarsCsv.length() != 0 && (split = libJarsCsv.split(",")).length != 0) {
            URL[] urlList = new URL[split.length];
            for (int i = 0; i < split.length; ++i) {
                File file = new File(split[i]);
                urlList[i] = file.toURI().toURL();
            }
            ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
            URLClassLoader cl = URLClassLoader.newInstance(urlList, prevCl);
            Thread.currentThread().setContextClassLoader(cl);
        }
    }

    public static class EmbeddedAppHandleImpl
    implements EmbeddedAppLauncher.EmbeddedAppHandle {
        final StramLocalCluster controller;

        public EmbeddedAppHandleImpl(StramLocalCluster controller) {
            this.controller = controller;
        }

        public boolean isFinished() {
            return this.controller.isFinished();
        }

        public void shutdown(Launcher.ShutdownMode shutdownMode) throws Launcher.LauncherException {
            this.controller.shutdown();
        }
    }
}

