/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.child;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFuture;
import org.gridgain.grid.hadoop.GridHadoopJob;
import org.gridgain.grid.hadoop.GridHadoopJobInfo;
import org.gridgain.grid.hadoop.GridHadoopJobProperty;
import org.gridgain.grid.hadoop.GridHadoopTaskInfo;
import org.gridgain.grid.hadoop.GridHadoopTaskInput;
import org.gridgain.grid.hadoop.GridHadoopTaskOutput;
import org.gridgain.grid.hadoop.GridHadoopTaskType;
import org.gridgain.grid.kernal.processors.hadoop.message.GridHadoopMessage;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleAck;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleJob;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleMessage;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.GridHadoopRunnableTask;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.GridHadoopTaskState;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopJobInfoUpdateRequest;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopPrepareForJobRequest;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopProcessDescriptor;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopProcessStartedAck;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopTaskExecutionRequest;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopTaskFinishedMessage;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopExternalCommunication;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopMessageListener;
import org.gridgain.grid.lang.GridInClosure;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.util.future.GridFutureAdapterEx;
import org.gridgain.grid.util.lang.GridInClosure2X;
import org.gridgain.grid.util.offheap.unsafe.GridUnsafeMemory;
import org.gridgain.grid.util.typedef.CI1;
import org.gridgain.grid.util.typedef.F;
import org.gridgain.grid.util.typedef.internal.U;

public class GridHadoopChildProcessRunner {
    private GridHadoopProcessDescriptor nodeDesc;
    private ExecutorService msgExecSvc;
    private ThreadPoolExecutor execSvc;
    protected GridUnsafeMemory mem = new GridUnsafeMemory(0L);
    private GridHadoopExternalCommunication comm;
    private GridLogger log;
    private AtomicBoolean initGuard = new AtomicBoolean();
    private long startTime;
    private GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx();
    private GridHadoopJob job;
    private AtomicInteger pendingTasks = new AtomicInteger();
    private GridHadoopShuffleJob<GridHadoopProcessDescriptor> shuffleJob;
    private int concMappers;
    private int concReducers;

    public void start(GridHadoopExternalCommunication comm, GridHadoopProcessDescriptor nodeDesc, ExecutorService msgExecSvc, GridLogger parentLog) throws GridException {
        this.comm = comm;
        this.nodeDesc = nodeDesc;
        this.msgExecSvc = msgExecSvc;
        comm.setListener(new MessageListener());
        this.log = parentLog.getLogger(GridHadoopChildProcessRunner.class);
        this.startTime = System.currentTimeMillis();
        comm.sendMessage(this.nodeDesc, new GridHadoopProcessStartedAck());
    }

    private void prepareProcess(GridHadoopPrepareForJobRequest req) {
        if (this.initGuard.compareAndSet(false, true)) {
            try {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Initializing external hadoop task: " + req);
                }
                assert (this.job == null);
                this.job = req.jobInfo().createJob(req.jobId());
                this.job.initialize(true, this.nodeDesc.processId());
                this.shuffleJob = new GridHadoopShuffleJob<GridHadoopProcessDescriptor>(this.comm.localProcessDescriptor(), this.log, this.job, this.mem, req.reducers(), req.hasMappers());
                this.initializeExecutors(req);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("External process initialized [initWaitTime=" + (System.currentTimeMillis() - this.startTime) + ']');
                }
                this.initFut.onDone(null, null);
            }
            catch (GridException e) {
                U.error((GridLogger)this.log, (Object)("Failed to initialize process: " + req), (Throwable)e);
                this.initFut.onDone((Throwable)e);
            }
        } else {
            this.log.warning("Duplicate initialize process request received (will ignore): " + req);
        }
    }

    private void runTasks(final GridHadoopTaskExecutionRequest req) {
        if (!this.initFut.isDone() && this.log.isDebugEnabled()) {
            this.log.debug("Will wait for process initialization future completion: " + req);
        }
        this.initFut.listenAsync((GridInClosure)new CI1<GridFuture<?>>(){

            public void apply(GridFuture<?> f) {
                try {
                    f.get();
                    boolean set = GridHadoopChildProcessRunner.this.pendingTasks.compareAndSet(0, req.tasks().size());
                    assert (set);
                    GridHadoopTaskInfo info = (GridHadoopTaskInfo)F.first(req.tasks());
                    assert (info != null);
                    int size = info.type() == GridHadoopTaskType.MAP ? GridHadoopChildProcessRunner.this.concMappers : GridHadoopChildProcessRunner.this.concReducers;
                    GridHadoopChildProcessRunner.this.execSvc.setCorePoolSize(size);
                    GridHadoopChildProcessRunner.this.execSvc.setMaximumPoolSize(size);
                    if (GridHadoopChildProcessRunner.this.log.isDebugEnabled()) {
                        GridHadoopChildProcessRunner.this.log.debug("Set executor service size for task type [type=" + info.type() + ", size=" + size + ']');
                    }
                    for (GridHadoopTaskInfo taskInfo : req.tasks()) {
                        if (GridHadoopChildProcessRunner.this.log.isDebugEnabled()) {
                            GridHadoopChildProcessRunner.this.log.debug("Submitted task for external execution: " + taskInfo);
                        }
                        GridHadoopChildProcessRunner.this.execSvc.submit(new GridHadoopRunnableTask(GridHadoopChildProcessRunner.this.log, GridHadoopChildProcessRunner.this.job, GridHadoopChildProcessRunner.this.mem, taskInfo){

                            @Override
                            protected void onTaskFinished(GridHadoopTaskState state, Throwable err) {
                                GridHadoopChildProcessRunner.this.onTaskFinished0(this, state, err);
                            }

                            @Override
                            protected GridHadoopTaskInput createInput(GridHadoopTaskInfo info) throws GridException {
                                return GridHadoopChildProcessRunner.this.shuffleJob.input(info);
                            }

                            @Override
                            protected GridHadoopTaskOutput createOutput(GridHadoopTaskInfo info) throws GridException {
                                return GridHadoopChildProcessRunner.this.shuffleJob.output(info);
                            }
                        });
                    }
                }
                catch (GridException e) {
                    for (GridHadoopTaskInfo info : req.tasks()) {
                        GridHadoopChildProcessRunner.this.notifyTaskFinished(info, GridHadoopTaskState.FAILED, e, false);
                    }
                }
            }
        });
    }

    private void initializeExecutors(GridHadoopPrepareForJobRequest req) {
        int cpus = Runtime.getRuntime().availableProcessors();
        this.concMappers = GridHadoopJobProperty.get((GridHadoopJobInfo)req.jobInfo(), (GridHadoopJobProperty)GridHadoopJobProperty.EXTERNAL_CONCURRENT_MAPPERS, (int)cpus);
        this.concReducers = GridHadoopJobProperty.get((GridHadoopJobInfo)req.jobInfo(), (GridHadoopJobProperty)GridHadoopJobProperty.EXTERNAL_CONCURRENT_REDUCERS, (int)cpus);
        this.execSvc = new ThreadPoolExecutor(this.concMappers, this.concMappers, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
    }

    private void updateTasks(final GridHadoopJobInfoUpdateRequest req) {
        this.initFut.listenAsync((GridInClosure)new CI1<GridFuture<?>>(){

            public void apply(GridFuture<?> gridFuture) {
                assert (GridHadoopChildProcessRunner.this.initGuard.get());
                assert (req.jobId().equals((Object)GridHadoopChildProcessRunner.this.job.id()));
                if (req.reducersAddresses() != null && GridHadoopChildProcessRunner.this.shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
                    GridHadoopChildProcessRunner.this.shuffleJob.startSending("external", new GridInClosure2X<GridHadoopProcessDescriptor, GridHadoopShuffleMessage>(){

                        public void applyx(GridHadoopProcessDescriptor dest, GridHadoopShuffleMessage msg) throws GridException {
                            GridHadoopChildProcessRunner.this.comm.sendMessage(dest, msg);
                        }
                    });
                }
            }
        });
    }

    private void shutdown() {
        if (this.execSvc != null) {
            this.execSvc.shutdownNow();
        }
        if (this.msgExecSvc != null) {
            this.msgExecSvc.shutdownNow();
        }
        try {
            this.job.dispose(true);
        }
        catch (GridException e) {
            U.error((GridLogger)this.log, (Object)"Failed to dispose job.", (Throwable)e);
        }
    }

    private void onTaskFinished0(GridHadoopRunnableTask run, GridHadoopTaskState state, Throwable err) {
        GridHadoopTaskInfo info = run.taskInfo();
        int pendingTasks0 = this.pendingTasks.decrementAndGet();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Hadoop task execution finished [info=" + info + ", state=" + (Object)((Object)state) + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() + ", pendingTasks=" + pendingTasks0 + ", err=" + err + ']');
        }
        boolean flush = pendingTasks0 == 0 && (info.type() == GridHadoopTaskType.COMBINE || info.type() == GridHadoopTaskType.MAP && (!this.job.info().hasCombiner() || !GridHadoopJobProperty.get((GridHadoopJobInfo)this.job.info(), (GridHadoopJobProperty)GridHadoopJobProperty.SINGLE_COMBINER_FOR_ALL_MAPPERS, (boolean)false)));
        this.notifyTaskFinished(info, state, err, flush);
    }

    private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final GridHadoopTaskState state, final Throwable err, boolean flush) {
        if (!flush) {
            try {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + (Object)((Object)state) + ", err=" + err + ']');
                }
                this.comm.sendMessage(this.nodeDesc, new GridHadoopTaskFinishedMessage(taskInfo, state, err));
            }
            catch (GridException e) {
                this.log.error("Failed to send message to parent node (will terminate child process).", (Throwable)e);
                this.shutdown();
                this.terminate();
            }
        } else {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" + taskInfo + ", state=" + (Object)((Object)state) + ", err=" + err + ']');
            }
            final long start = System.currentTimeMillis();
            try {
                this.shuffleJob.flush().listenAsync((GridInClosure)new CI1<GridFuture<?>>(){

                    public void apply(GridFuture<?> f) {
                        long end = System.currentTimeMillis();
                        if (GridHadoopChildProcessRunner.this.log.isDebugEnabled()) {
                            GridHadoopChildProcessRunner.this.log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo + ", flushTime=" + (end - start) + ']');
                        }
                        try {
                            f.get();
                            GridHadoopChildProcessRunner.this.notifyTaskFinished(taskInfo, state, err, false);
                        }
                        catch (GridException e) {
                            GridHadoopChildProcessRunner.this.log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + ", state=" + (Object)((Object)state) + ", err=" + err + ']', (Throwable)e);
                            GridHadoopChildProcessRunner.this.notifyTaskFinished(taskInfo, GridHadoopTaskState.FAILED, e, false);
                        }
                    }
                });
            }
            catch (GridException e) {
                this.log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + ", state=" + (Object)((Object)state) + ", err=" + err + ']', (Throwable)e);
                this.notifyTaskFinished(taskInfo, GridHadoopTaskState.FAILED, e, false);
            }
        }
    }

    private boolean validateNodeMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
        if (!this.nodeDesc.processId().equals(desc.processId())) {
            this.log.warning("Received process control request from unknown process (will ignore) [desc=" + desc + ", msg=" + msg + ']');
            return false;
        }
        return true;
    }

    private void terminate() {
        System.exit(1);
    }

    private class MessageListener
    implements GridHadoopMessageListener {
        private MessageListener() {
        }

        @Override
        public void onMessageReceived(final GridHadoopProcessDescriptor desc, final GridHadoopMessage msg) {
            if (msg instanceof GridHadoopTaskExecutionRequest) {
                if (GridHadoopChildProcessRunner.this.validateNodeMessage(desc, msg)) {
                    GridHadoopChildProcessRunner.this.runTasks((GridHadoopTaskExecutionRequest)msg);
                }
            } else if (msg instanceof GridHadoopJobInfoUpdateRequest) {
                if (GridHadoopChildProcessRunner.this.validateNodeMessage(desc, msg)) {
                    GridHadoopChildProcessRunner.this.updateTasks((GridHadoopJobInfoUpdateRequest)msg);
                }
            } else if (msg instanceof GridHadoopPrepareForJobRequest) {
                if (GridHadoopChildProcessRunner.this.validateNodeMessage(desc, msg)) {
                    GridHadoopChildProcessRunner.this.prepareProcess((GridHadoopPrepareForJobRequest)msg);
                }
            } else if (msg instanceof GridHadoopShuffleMessage) {
                if (GridHadoopChildProcessRunner.this.log.isTraceEnabled()) {
                    GridHadoopChildProcessRunner.this.log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']');
                }
                GridHadoopChildProcessRunner.this.initFut.listenAsync((GridInClosure)new CI1<GridFuture<?>>(){

                    public void apply(GridFuture<?> f) {
                        try {
                            GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg;
                            GridHadoopChildProcessRunner.this.shuffleJob.onShuffleMessage(m);
                            GridHadoopChildProcessRunner.this.comm.sendMessage(desc, new GridHadoopShuffleAck(m.id(), m.jobId()));
                        }
                        catch (GridException e) {
                            U.error((GridLogger)GridHadoopChildProcessRunner.this.log, (Object)("Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']'), (Throwable)e);
                        }
                    }
                });
            } else if (msg instanceof GridHadoopShuffleAck) {
                if (GridHadoopChildProcessRunner.this.log.isTraceEnabled()) {
                    GridHadoopChildProcessRunner.this.log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']');
                }
                GridHadoopChildProcessRunner.this.shuffleJob.onShuffleAck((GridHadoopShuffleAck)msg);
            } else {
                GridHadoopChildProcessRunner.this.log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']');
            }
        }

        @Override
        public void onConnectionLost(GridHadoopProcessDescriptor desc) {
            if (GridHadoopChildProcessRunner.this.log.isDebugEnabled()) {
                GridHadoopChildProcessRunner.this.log.debug("Lost connection with remote process: " + desc);
            }
            if (desc == null) {
                U.warn((GridLogger)GridHadoopChildProcessRunner.this.log, (Object)"Handshake failed.");
            } else if (desc.processId().equals(GridHadoopChildProcessRunner.this.nodeDesc.processId())) {
                GridHadoopChildProcessRunner.this.log.warning("Child process lost connection with parent node (will terminate child process).");
                GridHadoopChildProcessRunner.this.shutdown();
                GridHadoopChildProcessRunner.this.terminate();
            }
        }
    }
}

