/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFuture;
import org.gridgain.grid.hadoop.GridHadoopJob;
import org.gridgain.grid.hadoop.GridHadoopJobId;
import org.gridgain.grid.hadoop.GridHadoopJobPhase;
import org.gridgain.grid.hadoop.GridHadoopMapReducePlan;
import org.gridgain.grid.hadoop.GridHadoopTaskInfo;
import org.gridgain.grid.hadoop.GridHadoopTaskType;
import org.gridgain.grid.kernal.GridKernalContext;
import org.gridgain.grid.kernal.processors.hadoop.GridHadoopContext;
import org.gridgain.grid.kernal.processors.hadoop.jobtracker.GridHadoopJobMetadata;
import org.gridgain.grid.kernal.processors.hadoop.jobtracker.GridHadoopJobTracker;
import org.gridgain.grid.kernal.processors.hadoop.message.GridHadoopMessage;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.GridHadoopTaskExecutorAdapter;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.GridHadoopTaskState;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.GridHadoopTaskStatus;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopExternalTaskMetadata;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopJobInfoUpdateRequest;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopPrepareForJobRequest;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopProcessDescriptor;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopProcessStartedAck;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopTaskExecutionRequest;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopTaskFinishedMessage;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.child.GridHadoopExternalProcessStarter;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopExternalCommunication;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopMessageListener;
import org.gridgain.grid.lang.GridBiTuple;
import org.gridgain.grid.lang.GridInClosure;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.spi.GridPortProtocol;
import org.gridgain.grid.util.GridSpinReadWriteLock;
import org.gridgain.grid.util.future.GridFutureAdapter;
import org.gridgain.grid.util.typedef.CI1;
import org.gridgain.grid.util.typedef.F;
import org.gridgain.grid.util.typedef.internal.S;
import org.gridgain.grid.util.typedef.internal.U;
import org.jdk8.backport.ConcurrentHashMap8;
import org.jdk8.backport.ConcurrentLinkedDeque8;
import org.jetbrains.annotations.Nullable;

public class GridHadoopExternalTaskExecutor
extends GridHadoopTaskExecutorAdapter {
    private GridHadoopContext ctx;
    private String javaCmd;
    private GridLogger log;
    private GridHadoopProcessDescriptor nodeDesc;
    private File outputBase;
    private String pathSep;
    private GridHadoopExternalCommunication comm;
    private ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8();
    private ConcurrentMap<GridHadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8();
    private GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
    private GridHadoopJobTracker jobTracker;

    @Override
    public void start(GridHadoopContext ctx) throws GridException {
        this.ctx = ctx;
        this.log = ctx.kernalContext().log(GridHadoopExternalTaskExecutor.class);
        this.outputBase = U.resolveWorkDirectory((String)"hadoop", (boolean)false);
        this.pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
        this.initJavaCommand();
        this.comm = new GridHadoopExternalCommunication(ctx.localNodeId(), UUID.randomUUID(), ctx.kernalContext().config().getMarshaller(), this.log, ctx.kernalContext().config().getSystemExecutorService(), ctx.kernalContext().gridName());
        this.comm.setListener(new MessageListener());
        this.comm.start();
        this.nodeDesc = this.comm.localProcessDescriptor();
        ctx.kernalContext().ports().registerPort(this.nodeDesc.tcpPort(), GridPortProtocol.TCP, GridHadoopExternalTaskExecutor.class);
        if (this.nodeDesc.sharedMemoryPort() != -1) {
            ctx.kernalContext().ports().registerPort(this.nodeDesc.sharedMemoryPort(), GridPortProtocol.TCP, GridHadoopExternalTaskExecutor.class);
        }
        this.jobTracker = ctx.jobTracker();
    }

    @Override
    public void stop(boolean cancel) {
        this.busyLock.writeLock();
        try {
            this.comm.stop();
        }
        catch (GridException e) {
            U.error((GridLogger)this.log, (Object)"Failed to gracefully stop external hadoop communication server (will shutdown anyway)", (Throwable)e);
        }
    }

    @Override
    public void onJobStateChanged(final GridHadoopJobMetadata meta) {
        final HadoopProcess proc = (HadoopProcess)this.runningProcsByJobId.get(meta.jobId());
        if (proc != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
            }
            if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() + ", proc=" + proc + ']');
                }
                this.runningProcsByJobId.remove(meta.jobId());
                this.runningProcsByProcId.remove(proc.descriptor().processId());
                proc.terminate();
                return;
            }
            if (proc.initFut.isDone()) {
                if (!proc.initFut.isFailed()) {
                    this.sendJobInfoUpdate(proc, meta);
                } else if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to initialize child process (will skip job state notification) [jobId=" + meta.jobId() + ", meta=" + meta + ']');
                }
            } else {
                proc.initFut.listenAsync((GridInClosure)new CI1<GridFuture<GridBiTuple<Process, GridHadoopProcessDescriptor>>>(){

                    public void apply(GridFuture<GridBiTuple<Process, GridHadoopProcessDescriptor>> f) {
                        block2: {
                            try {
                                f.get();
                                GridHadoopExternalTaskExecutor.this.sendJobInfoUpdate(proc, meta);
                            }
                            catch (GridException e) {
                                if (!GridHadoopExternalTaskExecutor.this.log.isDebugEnabled()) break block2;
                                GridHadoopExternalTaskExecutor.this.log.debug("Failed to initialize child process (will skip job state notification) [jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + (Object)((Object)e) + ']');
                            }
                        }
                    }
                });
            }
        } else if (this.ctx.isParticipating(meta)) {
            GridHadoopJob job;
            try {
                job = this.jobTracker.job(meta.jobId(), meta.jobInfo());
            }
            catch (GridException e) {
                U.error((GridLogger)this.log, (Object)("Failed to get job: " + meta.jobId()), (Throwable)e);
                return;
            }
            this.startProcess(job, meta.mapReducePlan());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(final GridHadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws GridException {
        if (!this.busyLock.tryReadLock()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
            }
            return;
        }
        try {
            HadoopProcess proc = (HadoopProcess)this.runningProcsByJobId.get(job.id());
            GridHadoopTaskType taskType = ((GridHadoopTaskInfo)F.first(tasks)).type();
            if (taskType == GridHadoopTaskType.SETUP || taskType == GridHadoopTaskType.ABORT || taskType == GridHadoopTaskType.COMMIT) {
                if (proc == null || proc.terminated()) {
                    this.runningProcsByJobId.remove(job.id(), proc);
                    proc = this.startProcess(job, this.jobTracker.plan(job.id()));
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Starting new process for maintenance task [jobId=" + job.id() + ", proc=" + proc + ", taskType=" + taskType + ']');
                    }
                }
            } else assert (proc != null) : "Missing started process for task execution request: " + job.id() + ", tasks=" + tasks;
            final HadoopProcess proc0 = proc;
            proc.initFut.listenAsync((GridInClosure)new CI1<GridFuture<GridBiTuple<Process, GridHadoopProcessDescriptor>>>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void apply(GridFuture<GridBiTuple<Process, GridHadoopProcessDescriptor>> f) {
                    if (!GridHadoopExternalTaskExecutor.this.busyLock.tryReadLock()) {
                        return;
                    }
                    try {
                        f.get();
                        proc0.addTasks(tasks);
                        if (GridHadoopExternalTaskExecutor.this.log.isDebugEnabled()) {
                            GridHadoopExternalTaskExecutor.this.log.debug("Sending task execution request to child process [jobId=" + job.id() + ", proc=" + proc0 + ", tasks=" + tasks + ']');
                        }
                        GridHadoopExternalTaskExecutor.this.sendExecutionRequest(proc0, job, tasks);
                    }
                    catch (GridException e) {
                        GridHadoopExternalTaskExecutor.this.notifyTasksFailed(tasks, GridHadoopTaskState.FAILED, e);
                    }
                    finally {
                        GridHadoopExternalTaskExecutor.this.busyLock.readUnlock();
                    }
                }
            });
        }
        finally {
            this.busyLock.readUnlock();
        }
    }

    @Override
    public void cancelTasks(GridHadoopJobId jobId) {
        HadoopProcess proc = (HadoopProcess)this.runningProcsByJobId.get(jobId);
        if (proc != null) {
            proc.terminate();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendExecutionRequest(HadoopProcess proc, GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws GridException {
        proc.lock();
        try {
            if (proc.terminated()) {
                this.notifyTasksFailed(tasks, GridHadoopTaskState.CRASHED, null);
                return;
            }
            GridHadoopTaskExecutionRequest req = new GridHadoopTaskExecutionRequest();
            req.jobId(job.id());
            req.jobInfo(job.info());
            req.tasks(tasks);
            this.comm.sendMessage(proc.descriptor(), req);
        }
        finally {
            proc.unlock();
        }
    }

    private GridHadoopExternalTaskMetadata buildTaskMeta() {
        GridHadoopExternalTaskMetadata meta = new GridHadoopExternalTaskMetadata();
        meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
        meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled", "-DGRIDGAIN_HOME=" + U.getGridGainHome()));
        return meta;
    }

    private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, GridHadoopTaskState state, Throwable e) {
        GridHadoopTaskStatus fail = new GridHadoopTaskStatus(state, e);
        for (GridHadoopTaskInfo task : tasks) {
            this.jobTracker.onTaskFinished(task, fail);
        }
    }

    private HadoopProcess startProcess(final GridHadoopJob job, final GridHadoopMapReducePlan plan) {
        final UUID childProcId = UUID.randomUUID();
        GridHadoopJobId jobId = job.id();
        final GridHadoopProcessFuture fut = new GridHadoopProcessFuture(childProcId, jobId, this.ctx.kernalContext());
        final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(this.ctx.localNodeId()));
        HadoopProcess old = this.runningProcsByJobId.put(jobId, proc);
        assert (old == null);
        old = this.runningProcsByProcId.put(childProcId, proc);
        assert (old == null);
        this.ctx.kernalContext().closure().runLocalSafe(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                if (!GridHadoopExternalTaskExecutor.this.busyLock.tryReadLock()) {
                    fut.onDone(new GridException("Failed to start external process (grid is stopping)."));
                    return;
                }
                try {
                    String line;
                    GridHadoopExternalTaskMetadata startMeta = GridHadoopExternalTaskExecutor.this.buildTaskMeta();
                    if (GridHadoopExternalTaskExecutor.this.log.isDebugEnabled()) {
                        GridHadoopExternalTaskExecutor.this.log.debug("Created hadoop child process metadata for job [job=" + job + ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
                    }
                    Process proc = GridHadoopExternalTaskExecutor.this.startJavaProcess(childProcId, startMeta, job);
                    BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
                    while ((line = rdr.readLine()) != null) {
                        if (GridHadoopExternalTaskExecutor.this.log.isDebugEnabled()) {
                            GridHadoopExternalTaskExecutor.this.log.debug("Tracing process output: " + line);
                        }
                        if ("Started".equals(line)) {
                            if (GridHadoopExternalTaskExecutor.this.log.isDebugEnabled()) {
                                GridHadoopExternalTaskExecutor.this.log.debug("Successfully started child process [childProcId=" + childProcId + ", meta=" + job + ']');
                            }
                            fut.onProcessStarted(proc);
                        } else {
                            if (!"Failed".equals(line)) continue;
                            StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
                            while ((line = rdr.readLine()) != null) {
                                sb.append("    ").append(line).append("\n");
                            }
                            sb.setLength(sb.length() - 1);
                            GridHadoopExternalTaskExecutor.this.log.warning(sb.toString());
                            fut.onDone(new GridException(sb.toString()));
                        }
                        break;
                    }
                }
                catch (Throwable e) {
                    fut.onDone(new GridException("Failed to initialize child process: " + job, e));
                }
                finally {
                    GridHadoopExternalTaskExecutor.this.busyLock.readUnlock();
                }
            }
        }, true);
        fut.listenAsync((GridInClosure)new CI1<GridFuture<GridBiTuple<Process, GridHadoopProcessDescriptor>>>(){

            public void apply(GridFuture<GridBiTuple<Process, GridHadoopProcessDescriptor>> f) {
                try {
                    f.get();
                    GridHadoopExternalTaskExecutor.this.prepareForJob(proc, job, plan);
                }
                catch (GridException gridException) {
                    // empty catch block
                }
            }
        });
        return proc;
    }

    private void initJavaCommand() throws GridException {
        String javaHome = System.getProperty("java.home");
        if (javaHome == null) {
            javaHome = System.getenv("JAVA_HOME");
        }
        if (javaHome == null) {
            throw new GridException("Failed to locate JAVA_HOME.");
        }
        this.javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
        try {
            Process proc = new ProcessBuilder(this.javaCmd, "-version").redirectErrorStream(true).start();
            Collection<String> out = this.readProcessOutput(proc);
            int res = proc.waitFor();
            if (res != 0) {
                throw new GridException("Failed to execute 'java -version' command (process finished with nonzero code) [exitCode=" + res + ", javaCmd='" + this.javaCmd + "', msg=" + (String)F.first(out) + ']');
            }
            if (this.log.isInfoEnabled()) {
                this.log.info("Will use java for external task execution: ");
                for (String s : out) {
                    this.log.info("    " + s);
                }
            }
        }
        catch (IOException e) {
            throw new GridException("Failed to check java for external task execution.", (Throwable)e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new GridException("Failed to wait for process completion (thread got interrupted).", (Throwable)e);
        }
    }

    private Collection<String> readProcessOutput(Process proc) throws IOException {
        String s;
        BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
        ArrayList<String> res = new ArrayList<String>();
        while ((s = rdr.readLine()) != null) {
            res.add(s);
        }
        return res;
    }

    private Process startJavaProcess(UUID childProcId, GridHadoopExternalTaskMetadata startMeta, GridHadoopJob job) throws Exception {
        String outFldr = this.jobWorkFolder(job.id()) + File.separator + childProcId;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Will write process log output to: " + outFldr);
        }
        ArrayList<String> cmd = new ArrayList<String>();
        File workDir = U.resolveWorkDirectory((String)"", (boolean)false);
        cmd.add(this.javaCmd);
        cmd.addAll(startMeta.jvmOptions());
        cmd.add("-cp");
        cmd.add(this.buildClasspath(startMeta.classpath()));
        cmd.add(GridHadoopExternalProcessStarter.class.getName());
        cmd.add("-cpid");
        cmd.add(String.valueOf(childProcId));
        cmd.add("-ppid");
        cmd.add(String.valueOf(this.nodeDesc.processId()));
        cmd.add("-nid");
        cmd.add(String.valueOf(this.nodeDesc.parentNodeId()));
        cmd.add("-addr");
        cmd.add(this.nodeDesc.address());
        cmd.add("-tport");
        cmd.add(String.valueOf(this.nodeDesc.tcpPort()));
        cmd.add("-sport");
        cmd.add(String.valueOf(this.nodeDesc.sharedMemoryPort()));
        cmd.add("-out");
        cmd.add(outFldr);
        cmd.add("-wd");
        cmd.add(workDir.getAbsolutePath());
        return new ProcessBuilder(cmd).redirectErrorStream(true).directory(workDir).start();
    }

    private String jobWorkFolder(GridHadoopJobId jobId) {
        return this.outputBase + File.separator + "Job_" + jobId;
    }

    private String buildClasspath(Collection<String> cp) {
        assert (!cp.isEmpty());
        StringBuilder sb = new StringBuilder();
        for (String s : cp) {
            sb.append(s).append(this.pathSep);
        }
        sb.setLength(sb.length() - 1);
        return sb.toString();
    }

    private void sendJobInfoUpdate(HadoopProcess proc, GridHadoopJobMetadata meta) {
        block5: {
            Map<Integer, GridHadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
            int rdcNum = meta.mapReducePlan().reducers();
            GridHadoopProcessDescriptor[] addrs = null;
            if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
                addrs = new GridHadoopProcessDescriptor[rdcNum];
                for (int i = 0; i < rdcNum; ++i) {
                    GridHadoopProcessDescriptor desc = rdcAddrs.get(i);
                    assert (desc != null) : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
                    addrs[i] = desc;
                }
            }
            try {
                this.comm.sendMessage(proc.descriptor(), new GridHadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
            }
            catch (GridException e) {
                if (proc.terminated()) break block5;
                this.log.error("Failed to send job state update message to remote child process (will kill the process) [jobId=" + proc.jobId + ", meta=" + meta + ']', (Throwable)e);
                proc.terminate();
            }
        }
    }

    private void prepareForJob(HadoopProcess proc, GridHadoopJob job, GridHadoopMapReducePlan plan) {
        try {
            this.comm.sendMessage(proc.descriptor(), new GridHadoopPrepareForJobRequest(job.id(), job.info(), !F.isEmpty((Collection)plan.mappers(this.ctx.localNodeId())), plan.reducers()));
        }
        catch (GridException e) {
            U.error((GridLogger)this.log, (Object)("Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job + ", plan=" + plan + ']'), (Throwable)e);
            proc.terminate();
        }
    }

    private void processTaskFinishedMessage(GridHadoopProcessDescriptor desc, GridHadoopTaskFinishedMessage taskMsg) {
        HadoopProcess proc = (HadoopProcess)this.runningProcsByProcId.get(desc.processId());
        if (proc != null) {
            proc.removeTask(taskMsg.taskInfo());
        }
        this.jobTracker.onTaskFinished(taskMsg.taskInfo(), new GridHadoopTaskStatus(taskMsg.state(), taskMsg.error()));
    }

    private class GridHadoopProcessFuture
    extends GridFutureAdapter<GridBiTuple<Process, GridHadoopProcessDescriptor>> {
        private static final long serialVersionUID = 0L;
        private UUID childProcId;
        private GridHadoopJobId jobId;
        private GridHadoopProcessDescriptor desc;
        private Process proc;
        private volatile boolean procStarted;
        private volatile boolean replyReceived;
        private GridLogger log;

        public GridHadoopProcessFuture() {
            this.log = GridHadoopExternalTaskExecutor.this.log;
        }

        private GridHadoopProcessFuture(UUID childProcId, GridHadoopJobId jobId, GridKernalContext ctx) {
            super(ctx);
            this.log = GridHadoopExternalTaskExecutor.this.log;
            this.childProcId = childProcId;
            this.jobId = jobId;
        }

        public void onProcessStarted(Process proc) {
            this.proc = proc;
            this.procStarted = true;
            if (this.procStarted && this.replyReceived) {
                this.onDone(F.t((Object)proc, (Object)this.desc));
            }
        }

        public void onReplyReceived(GridHadoopProcessDescriptor desc) {
            assert (this.childProcId.equals(desc.processId()));
            this.desc = desc;
            this.replyReceived = true;
            if (this.procStarted && this.replyReceived) {
                this.onDone(F.t((Object)this.proc, (Object)desc));
            }
        }

        public boolean onDone(@Nullable GridBiTuple<Process, GridHadoopProcessDescriptor> res, @Nullable Throwable err) {
            if (err == null) {
                HadoopProcess proc = (HadoopProcess)GridHadoopExternalTaskExecutor.this.runningProcsByProcId.get(this.childProcId);
                assert (proc != null);
                assert (proc.initFut == this);
                proc.onInitialized((Process)res.get1(), (GridHadoopProcessDescriptor)res.get2());
                if (!F.isEmpty((Collection)proc.reducers())) {
                    GridHadoopExternalTaskExecutor.this.jobTracker.onExternalMappersInitialized(this.jobId, proc.reducers(), this.desc);
                }
            } else {
                GridHadoopExternalTaskExecutor.this.runningProcsByJobId.remove(this.jobId);
                GridHadoopExternalTaskExecutor.this.runningProcsByProcId.remove(this.childProcId);
            }
            if (super.onDone(res, err)) {
                if (err == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Initialized child process for external task execution [jobId=" + this.jobId + ", desc=" + this.desc + ", initTime=" + this.duration() + ']');
                    }
                } else {
                    U.error((GridLogger)this.log, (Object)("Failed to initialize child process for external task execution [jobId=" + this.jobId + ", desc=" + this.desc + ']'), (Throwable)err);
                }
                return true;
            }
            return false;
        }
    }

    private static class HadoopProcess
    extends ReentrantLock {
        private static final long serialVersionUID = 0L;
        private GridHadoopJobId jobId;
        private Process proc;
        private final GridHadoopProcessFuture initFut;
        private GridHadoopProcessDescriptor procDesc;
        private Collection<Integer> reducers;
        private Collection<GridHadoopTaskInfo> tasks = new ConcurrentLinkedDeque8();
        private volatile boolean terminated;

        private HadoopProcess(GridHadoopJobId jobId, GridHadoopProcessFuture initFut, int[] reducers) {
            this.jobId = jobId;
            this.initFut = initFut;
            if (!F.isEmpty((int[])reducers)) {
                this.reducers = new ArrayList<Integer>(reducers.length);
                for (int r : reducers) {
                    this.reducers.add(r);
                }
            }
        }

        private GridHadoopProcessDescriptor descriptor() {
            return this.procDesc;
        }

        public GridHadoopJobId jobId() {
            return this.jobId;
        }

        private void onInitialized(Process proc, GridHadoopProcessDescriptor procDesc) {
            this.proc = proc;
            this.procDesc = procDesc;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void terminate() {
            this.lock();
            try {
                this.terminated = true;
                if (!this.initFut.isDone()) {
                    this.initFut.listenAsync((GridInClosure)new CI1<GridFuture<GridBiTuple<Process, GridHadoopProcessDescriptor>>>(){

                        public void apply(GridFuture<GridBiTuple<Process, GridHadoopProcessDescriptor>> f) {
                            HadoopProcess.this.proc.destroy();
                        }
                    });
                } else {
                    this.proc.destroy();
                }
            }
            finally {
                this.unlock();
            }
        }

        private boolean terminated() {
            return this.terminated;
        }

        private void addTasks(Collection<GridHadoopTaskInfo> tasks) {
            this.tasks.addAll(tasks);
        }

        private void removeTask(GridHadoopTaskInfo task) {
            if (this.tasks != null) {
                this.tasks.remove(task);
            }
        }

        private Collection<GridHadoopTaskInfo> tasks() {
            return this.tasks;
        }

        private Collection<Integer> reducers() {
            return this.reducers;
        }

        @Override
        public String toString() {
            return S.toString(HadoopProcess.class, (Object)this);
        }
    }

    private class MessageListener
    implements GridHadoopMessageListener {
        private MessageListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
            if (!GridHadoopExternalTaskExecutor.this.busyLock.tryReadLock()) {
                return;
            }
            try {
                if (msg instanceof GridHadoopProcessStartedAck) {
                    HadoopProcess proc = (HadoopProcess)GridHadoopExternalTaskExecutor.this.runningProcsByProcId.get(desc.processId());
                    assert (proc != null) : "Missing child process for processId: " + desc;
                    GridHadoopProcessFuture fut = proc.initFut;
                    if (fut != null) {
                        fut.onReplyReceived(desc);
                    } else {
                        GridHadoopExternalTaskExecutor.this.log.warning("Failed to find process start future (will ignore): " + desc);
                    }
                } else if (msg instanceof GridHadoopTaskFinishedMessage) {
                    GridHadoopTaskFinishedMessage taskMsg = (GridHadoopTaskFinishedMessage)msg;
                    GridHadoopExternalTaskExecutor.this.processTaskFinishedMessage(desc, taskMsg);
                } else {
                    GridHadoopExternalTaskExecutor.this.log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
                }
            }
            finally {
                GridHadoopExternalTaskExecutor.this.busyLock.readUnlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onConnectionLost(GridHadoopProcessDescriptor desc) {
            if (!GridHadoopExternalTaskExecutor.this.busyLock.tryReadLock()) {
                return;
            }
            try {
                if (desc == null) {
                    U.warn((GridLogger)GridHadoopExternalTaskExecutor.this.log, (Object)"Handshake failed.");
                    return;
                }
                HadoopProcess proc = (HadoopProcess)GridHadoopExternalTaskExecutor.this.runningProcsByProcId.get(desc.processId());
                if (proc != null) {
                    Collection tasks = proc.tasks();
                    if (!F.isEmpty((Collection)tasks)) {
                        GridHadoopExternalTaskExecutor.this.log.warning("Lost connection with alive process (will terminate): " + desc);
                        GridHadoopTaskStatus status = new GridHadoopTaskStatus(GridHadoopTaskState.CRASHED, new GridException("Failed to run tasks (external process finished unexpectedly): " + desc));
                        for (GridHadoopTaskInfo info : tasks) {
                            GridHadoopExternalTaskExecutor.this.jobTracker.onTaskFinished(info, status);
                        }
                        GridHadoopExternalTaskExecutor.this.runningProcsByJobId.remove(proc.jobId(), proc);
                    }
                    proc.terminate();
                }
            }
            finally {
                GridHadoopExternalTaskExecutor.this.busyLock.readUnlock();
            }
        }
    }
}

