/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.interpreter.remote;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterContextRunner;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.resource.DistributedResourcePool;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourceSet;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteInterpreterServer
extends Thread
implements RemoteInterpreterService.Iface,
AngularObjectRegistryListener {
    Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class);
    InterpreterGroup interpreterGroup;
    AngularObjectRegistry angularObjectRegistry;
    DistributedResourcePool resourcePool;
    Gson gson = new Gson();
    RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
    RemoteInterpreterServer handler;
    private int port;
    private TThreadPoolServer server;
    RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient();

    public RemoteInterpreterServer(int port) throws TTransportException {
        this.port = port;
        this.processor = new RemoteInterpreterService.Processor<RemoteInterpreterServer>(this);
        TServerSocket serverTransport = new TServerSocket(port);
        this.server = new TThreadPoolServer((TThreadPoolServer.Args)new TThreadPoolServer.Args(serverTransport).processor(this.processor));
    }

    @Override
    public void run() {
        this.logger.info("Starting remote interpreter server on port {}", (Object)this.port);
        this.server.serve();
    }

    @Override
    public void shutdown() throws TException {
        if (this.interpreterGroup != null) {
            this.interpreterGroup.close();
            this.interpreterGroup.destroy();
        }
        this.server.stop();
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < 2000L && this.server.isServing()) {
            try {
                Thread.sleep(300L);
            }
            catch (InterruptedException e) {
                this.logger.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e);
            }
        }
        if (this.server.isServing()) {
            System.exit(0);
        }
    }

    public int getPort() {
        return this.port;
    }

    public boolean isRunning() {
        if (this.server == null) {
            return false;
        }
        return this.server.isServing();
    }

    public static void main(String[] args) throws TTransportException, InterruptedException {
        int port = Integer.parseInt(args[0]);
        RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port);
        remoteInterpreterServer.start();
        remoteInterpreterServer.join();
        System.exit(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void createInterpreter(String interpreterGroupId, String noteId, String className, Map<String, String> properties) throws TException {
        if (this.interpreterGroup == null) {
            this.interpreterGroup = new InterpreterGroup(interpreterGroupId);
            this.angularObjectRegistry = new AngularObjectRegistry(this.interpreterGroup.getId(), this);
            this.resourcePool = new DistributedResourcePool(this.interpreterGroup.getId(), this.eventClient);
            this.interpreterGroup.setAngularObjectRegistry(this.angularObjectRegistry);
            this.interpreterGroup.setResourcePool(this.resourcePool);
        }
        try {
            Class<?> replClass = Class.forName(className);
            Properties p = new Properties();
            p.putAll(properties);
            this.setSystemProperty(p);
            Constructor<?> constructor = replClass.getConstructor(Properties.class);
            Interpreter repl = (Interpreter)constructor.newInstance(p);
            repl.setClassloaderUrls(new URL[0]);
            InterpreterGroup interpreterGroup = this.interpreterGroup;
            synchronized (interpreterGroup) {
                LinkedList<LazyOpenInterpreter> interpreters = (LinkedList<LazyOpenInterpreter>)this.interpreterGroup.get(noteId);
                if (interpreters == null) {
                    interpreters = new LinkedList<LazyOpenInterpreter>();
                    this.interpreterGroup.put(noteId, interpreters);
                }
                interpreters.add(new LazyOpenInterpreter(repl));
            }
            this.logger.info("Instantiate interpreter {}", (Object)className);
            repl.setInterpreterGroup(this.interpreterGroup);
        }
        catch (ClassNotFoundException | IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException | SecurityException | InvocationTargetException e) {
            this.logger.error(e.toString(), e);
            throw new TException(e);
        }
    }

    private void setSystemProperty(Properties properties) {
        for (Object key : properties.keySet()) {
            if (RemoteInterpreter.isEnvString((String)key)) continue;
            String value = properties.getProperty((String)key);
            if (value == null || value.isEmpty()) {
                System.clearProperty((String)key);
                continue;
            }
            System.setProperty((String)key, properties.getProperty((String)key));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Interpreter getInterpreter(String noteId, String className) throws TException {
        if (this.interpreterGroup == null) {
            throw new TException(new InterpreterException("Interpreter instance " + className + " not created"));
        }
        InterpreterGroup interpreterGroup = this.interpreterGroup;
        synchronized (interpreterGroup) {
            List interpreters = (List)this.interpreterGroup.get(noteId);
            if (interpreters == null) {
                throw new TException(new InterpreterException("Interpreter " + className + " not initialized"));
            }
            for (Interpreter inp : interpreters) {
                if (!inp.getClassName().equals(className)) continue;
                return inp;
            }
        }
        throw new TException(new InterpreterException("Interpreter instance " + className + " not found"));
    }

    @Override
    public void open(String noteId, String className) throws TException {
        Interpreter intp = this.getInterpreter(noteId, className);
        intp.open();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(String noteId, String className) throws TException {
        InterpreterGroup interpreterGroup = this.interpreterGroup;
        synchronized (interpreterGroup) {
            List interpreters = (List)this.interpreterGroup.get(noteId);
            if (interpreters != null) {
                Iterator it = interpreters.iterator();
                while (it.hasNext()) {
                    Interpreter inp = (Interpreter)it.next();
                    if (!inp.getClassName().equals(className)) continue;
                    inp.close();
                    it.remove();
                    break;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public RemoteInterpreterResult interpret(String noteId, String className, String st, RemoteInterpreterContext interpreterContext) throws TException {
        InterpreterResult result;
        this.logger.debug("st: {}", (Object)st);
        Interpreter intp = this.getInterpreter(noteId, className);
        InterpreterContext context = this.convert(interpreterContext);
        Scheduler scheduler = intp.getScheduler();
        InterpretJobListener jobListener = new InterpretJobListener();
        InterpretJob job = new InterpretJob(interpreterContext.getParagraphId(), "remoteInterpretJob_" + System.currentTimeMillis(), jobListener, 500L, intp, st, context);
        scheduler.submit(job);
        while (!job.isTerminated()) {
            InterpretJobListener interpretJobListener = jobListener;
            synchronized (interpretJobListener) {
                try {
                    jobListener.wait(1000L);
                }
                catch (InterruptedException e) {
                    this.logger.info("Exception in RemoteInterpreterServer while interpret, jobListener.wait", e);
                }
            }
        }
        if (job.getStatus() == Job.Status.ERROR) {
            result = new InterpreterResult(InterpreterResult.Code.ERROR, Job.getStack(job.getException()));
        } else {
            result = (InterpreterResult)job.getReturn();
            if (result == null) {
                result = new InterpreterResult(InterpreterResult.Code.KEEP_PREVIOUS_RESULT);
            }
        }
        return this.convert(result, context.getConfig(), context.getGui());
    }

    @Override
    public void cancel(String noteId, String className, RemoteInterpreterContext interpreterContext) throws TException {
        this.logger.info("cancel {} {}", (Object)className, (Object)interpreterContext.getParagraphId());
        Interpreter intp = this.getInterpreter(noteId, className);
        String jobId = interpreterContext.getParagraphId();
        Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
        if (job != null) {
            job.setStatus(Job.Status.ABORT);
        } else {
            intp.cancel(this.convert(interpreterContext));
        }
    }

    @Override
    public int getProgress(String noteId, String className, RemoteInterpreterContext interpreterContext) throws TException {
        Interpreter intp = this.getInterpreter(noteId, className);
        return intp.getProgress(this.convert(interpreterContext));
    }

    @Override
    public String getFormType(String noteId, String className) throws TException {
        Interpreter intp = this.getInterpreter(noteId, className);
        return intp.getFormType().toString();
    }

    @Override
    public List<InterpreterCompletion> completion(String noteId, String className, String buf, int cursor) throws TException {
        Interpreter intp = this.getInterpreter(noteId, className);
        List<InterpreterCompletion> completion2 = intp.completion(buf, cursor);
        return completion2;
    }

    private InterpreterContext convert(RemoteInterpreterContext ric) {
        LinkedList<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
        List runners = (List)this.gson.fromJson(ric.getRunners(), new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType());
        for (InterpreterContextRunner r : runners) {
            contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
        }
        return new InterpreterContext(ric.getNoteId(), ric.getParagraphId(), ric.getParagraphTitle(), ric.getParagraphText(), this.gson.fromJson(ric.getAuthenticationInfo(), AuthenticationInfo.class), (Map)this.gson.fromJson(ric.getConfig(), new TypeToken<Map<String, Object>>(){}.getType()), this.gson.fromJson(ric.getGui(), GUI.class), this.interpreterGroup.getAngularObjectRegistry(), this.interpreterGroup.getResourcePool(), contextRunners, this.createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
    }

    private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) {
        return new InterpreterOutput(new InterpreterOutputListener(){

            @Override
            public void onAppend(InterpreterOutput out, byte[] line) {
                RemoteInterpreterServer.this.logger.debug("Output Append:" + new String(line));
                RemoteInterpreterServer.this.eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
            }

            @Override
            public void onUpdate(InterpreterOutput out, byte[] output) {
                RemoteInterpreterServer.this.logger.debug("Output Update:" + new String(output));
                RemoteInterpreterServer.this.eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
            }
        });
    }

    private RemoteInterpreterResult convert(InterpreterResult result, Map<String, Object> config, GUI gui) {
        return new RemoteInterpreterResult(result.code().name(), result.type().name(), result.message(), this.gson.toJson(config), this.gson.toJson(gui));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String getStatus(String noteId, String jobId) throws TException {
        if (this.interpreterGroup == null) {
            return "Unknown";
        }
        InterpreterGroup interpreterGroup = this.interpreterGroup;
        synchronized (interpreterGroup) {
            List interpreters = (List)this.interpreterGroup.get(noteId);
            if (interpreters == null) {
                return "Unknown";
            }
            for (Interpreter intp : interpreters) {
                for (Job job : intp.getScheduler().getJobsRunning()) {
                    if (!jobId.equals(job.getId())) continue;
                    return job.getStatus().name();
                }
                for (Job job : intp.getScheduler().getJobsWaiting()) {
                    if (!jobId.equals(job.getId())) continue;
                    return job.getStatus().name();
                }
            }
        }
        return "Unknown";
    }

    @Override
    public void onAdd(String interpreterGroupId, AngularObject object) {
        this.eventClient.angularObjectAdd(object);
    }

    @Override
    public void onUpdate(String interpreterGroupId, AngularObject object) {
        this.eventClient.angularObjectUpdate(object);
    }

    @Override
    public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
        this.eventClient.angularObjectRemove(name, noteId, paragraphId);
    }

    @Override
    public RemoteInterpreterEvent getEvent() throws TException {
        return this.eventClient.pollEvent();
    }

    @Override
    public void angularObjectUpdate(String name, String noteId, String paragraphId, String object) throws TException {
        AngularObjectRegistry registry = this.interpreterGroup.getAngularObjectRegistry();
        AngularObject ao = registry.get(name, noteId, paragraphId);
        if (ao == null) {
            this.logger.debug("Angular object {} not exists", (Object)name);
            return;
        }
        if (object == null) {
            ao.set(null, false);
            return;
        }
        Object oldObject = ao.get();
        String value = null;
        if (oldObject != null) {
            try {
                value = (String)this.gson.fromJson(object, oldObject.getClass());
                ao.set(value, false);
                return;
            }
            catch (Exception e) {
                this.logger.debug(e.getMessage(), e);
            }
        }
        if (value == null) {
            try {
                value = this.gson.fromJson(object, new TypeToken<Map<String, Object>>(){}.getType());
            }
            catch (Exception e) {
                this.logger.debug(e.getMessage(), e);
            }
        }
        if (value == null) {
            value = this.gson.fromJson(object, String.class);
        }
        ao.set(value, false);
    }

    @Override
    public void angularObjectAdd(String name, String noteId, String paragraphId, String object) throws TException {
        AngularObjectRegistry registry = this.interpreterGroup.getAngularObjectRegistry();
        AngularObject ao = registry.get(name, noteId, paragraphId);
        if (ao != null) {
            this.angularObjectUpdate(name, noteId, paragraphId, object);
            return;
        }
        String value = null;
        try {
            value = (String)this.gson.fromJson(object, new TypeToken<Map<String, Object>>(){}.getType());
        }
        catch (Exception e) {
            this.logger.debug(e.getMessage(), e);
        }
        if (value == null) {
            value = this.gson.fromJson(object, String.class);
        }
        registry.add(name, value, noteId, paragraphId, false);
    }

    @Override
    public void angularObjectRemove(String name, String noteId, String paragraphId) throws TException {
        AngularObjectRegistry registry = this.interpreterGroup.getAngularObjectRegistry();
        registry.remove(name, noteId, paragraphId, false);
    }

    @Override
    public void resourcePoolResponseGetAll(List<String> resources) throws TException {
        this.eventClient.putResponseGetAllResources(resources);
    }

    @Override
    public void resourceResponseGet(String resourceId, ByteBuffer object) throws TException {
        this.eventClient.putResponseGetResource(resourceId, object);
    }

    @Override
    public List<String> resourcePoolGetAll() throws TException {
        this.logger.debug("Request getAll from ZeppelinServer");
        ResourceSet resourceSet = this.resourcePool.getAll(false);
        LinkedList<String> result = new LinkedList<String>();
        Gson gson = new Gson();
        for (Resource r : resourceSet) {
            result.add(gson.toJson(r));
        }
        return result;
    }

    @Override
    public boolean resourceRemove(String noteId, String paragraphId, String resourceName) throws TException {
        Resource resource = this.resourcePool.remove(noteId, paragraphId, resourceName);
        return resource != null;
    }

    @Override
    public ByteBuffer resourceGet(String noteId, String paragraphId, String resourceName) throws TException {
        this.logger.debug("Request resourceGet {} from ZeppelinServer", (Object)resourceName);
        Resource resource = this.resourcePool.get(noteId, paragraphId, resourceName, false);
        if (resource == null || resource.get() == null || !resource.isSerializable()) {
            return ByteBuffer.allocate(0);
        }
        try {
            return Resource.serializeObject(resource.get());
        }
        catch (IOException e) {
            this.logger.error(e.getMessage(), e);
            return ByteBuffer.allocate(0);
        }
    }

    @Override
    public void angularRegistryPush(String registryAsString) throws TException {
        try {
            Map deserializedRegistry = (Map)this.gson.fromJson(registryAsString, new TypeToken<Map<String, Map<String, AngularObject>>>(){}.getType());
            this.interpreterGroup.getAngularObjectRegistry().setRegistry(deserializedRegistry);
        }
        catch (Exception e) {
            this.logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e);
        }
    }

    static class ParagraphRunner
    extends InterpreterContextRunner {
        private transient RemoteInterpreterServer server;

        public ParagraphRunner(RemoteInterpreterServer server, String noteId, String paragraphId) {
            super(noteId, paragraphId);
            this.server = server;
        }

        @Override
        public void run() {
            this.server.eventClient.run(this);
        }
    }

    class InterpretJob
    extends Job {
        private Interpreter interpreter;
        private String script;
        private InterpreterContext context;
        private Map<String, Object> infos;

        public InterpretJob(String jobId, String jobName, JobListener listener, long progressUpdateIntervalMsec, Interpreter interpreter, String script, InterpreterContext context) {
            super(jobId, jobName, listener, progressUpdateIntervalMsec);
            this.interpreter = interpreter;
            this.script = script;
            this.context = context;
        }

        @Override
        public int progress() {
            return 0;
        }

        @Override
        public Map<String, Object> info() {
            if (this.infos == null) {
                this.infos = new HashMap<String, Object>();
            }
            return this.infos;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected Object jobRun() throws Throwable {
            try {
                InterpreterResult combinedResult;
                String interpreterResultMessage;
                InterpreterContext.set(this.context);
                InterpreterResult result = this.interpreter.interpret(this.script, this.context);
                String message = "";
                this.context.out.flush();
                InterpreterResult.Type outputType = this.context.out.getType();
                byte[] interpreterOutput = this.context.out.toByteArray();
                this.context.out.clear();
                if (interpreterOutput != null && interpreterOutput.length > 0) {
                    message = new String(interpreterOutput);
                }
                if ((interpreterResultMessage = result.message()) != null && !interpreterResultMessage.isEmpty()) {
                    message = message + interpreterResultMessage;
                    combinedResult = new InterpreterResult(result.code(), result.type(), message);
                } else {
                    combinedResult = new InterpreterResult(result.code(), outputType, message);
                }
                this.context.getResourcePool().put(this.context.getNoteId(), this.context.getParagraphId(), WellKnownResourceName.ParagraphResult.toString(), combinedResult);
                InterpreterResult interpreterResult = combinedResult;
                return interpreterResult;
            }
            finally {
                InterpreterContext.remove();
            }
        }

        @Override
        protected boolean jobAbort() {
            return false;
        }
    }

    class InterpretJobListener
    implements JobListener {
        InterpretJobListener() {
        }

        @Override
        public void onProgressUpdate(Job job, int progress) {
        }

        @Override
        public void beforeStatusChange(Job job, Job.Status before, Job.Status after) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void afterStatusChange(Job job, Job.Status before, Job.Status after) {
            InterpretJobListener interpretJobListener = this;
            synchronized (interpretJobListener) {
                this.notifyAll();
            }
        }
    }
}

