/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.processors.hadoop.shuffle;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFuture;
import org.gridgain.grid.GridNode;
import org.gridgain.grid.hadoop.GridHadoopJobId;
import org.gridgain.grid.hadoop.GridHadoopMapReducePlan;
import org.gridgain.grid.hadoop.GridHadoopTaskInfo;
import org.gridgain.grid.hadoop.GridHadoopTaskInput;
import org.gridgain.grid.hadoop.GridHadoopTaskOutput;
import org.gridgain.grid.kernal.GridTopic;
import org.gridgain.grid.kernal.processors.hadoop.GridHadoopComponent;
import org.gridgain.grid.kernal.processors.hadoop.GridHadoopContext;
import org.gridgain.grid.kernal.processors.hadoop.message.GridHadoopMessage;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleAck;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleJob;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleMessage;
import org.gridgain.grid.lang.GridBiPredicate;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.util.future.GridFinishedFutureEx;
import org.gridgain.grid.util.lang.GridInClosure2X;
import org.gridgain.grid.util.offheap.unsafe.GridUnsafeMemory;
import org.gridgain.grid.util.typedef.F;
import org.gridgain.grid.util.typedef.internal.U;

public class GridHadoopShuffle
extends GridHadoopComponent {
    private ConcurrentMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>>();
    protected GridUnsafeMemory mem = new GridUnsafeMemory(0L);

    @Override
    public void start(GridHadoopContext ctx) throws GridException {
        super.start(ctx);
        ctx.kernalContext().io().addUserMessageListener((Object)GridTopic.TOPIC_HADOOP, (GridBiPredicate)new GridBiPredicate<UUID, Object>(){

            public boolean apply(UUID nodeId, Object msg) {
                return GridHadoopShuffle.this.onMessageReceived(nodeId, (GridHadoopMessage)msg);
            }
        });
    }

    @Override
    public void stop(boolean cancel) {
        for (GridHadoopShuffleJob job : this.jobs.values()) {
            try {
                job.close();
            }
            catch (GridException e) {
                U.error((GridLogger)this.log, (Object)"Failed to close job.", (Throwable)e);
            }
        }
        this.jobs.clear();
    }

    private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws GridException {
        GridHadoopMapReducePlan plan = this.ctx.jobTracker().plan(jobId);
        GridHadoopShuffleJob<UUID> job = new GridHadoopShuffleJob<UUID>(this.ctx.localNodeId(), this.log, this.ctx.jobTracker().job(jobId, null), this.mem, plan.reducers(), !F.isEmpty((Collection)plan.mappers(this.ctx.localNodeId())));
        UUID[] rdcAddrs = new UUID[plan.reducers()];
        for (int i = 0; i < rdcAddrs.length; ++i) {
            UUID nodeId = plan.nodeForReducer(i);
            assert (nodeId != null) : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']';
            rdcAddrs[i] = nodeId;
        }
        boolean init = job.initializeReduceAddresses((UUID[])rdcAddrs);
        assert (init);
        return job;
    }

    private void send0(UUID nodeId, Object msg) throws GridException {
        GridNode node = this.ctx.kernalContext().discovery().node(nodeId);
        this.ctx.kernalContext().io().sendUserMessage((Collection)F.asList((Object)node), msg, (Object)GridTopic.TOPIC_HADOOP, false, 0L);
    }

    private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws GridException {
        GridHadoopShuffleJob<UUID> res = (GridHadoopShuffleJob<UUID>)this.jobs.get(jobId);
        if (res == null) {
            res = this.newJob(jobId);
            GridHadoopShuffleJob<UUID> old = this.jobs.putIfAbsent(jobId, res);
            if (old != null) {
                res.close();
                res = old;
            } else if (res.reducersInitialized()) {
                this.startSending(res);
            }
        }
        return res;
    }

    private void startSending(GridHadoopShuffleJob<UUID> shuffleJob) {
        shuffleJob.startSending(this.ctx.kernalContext().gridName(), new GridInClosure2X<UUID, GridHadoopShuffleMessage>(){

            public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws GridException {
                GridHadoopShuffle.this.send0(dest, msg);
            }
        });
    }

    public boolean onMessageReceived(UUID src, GridHadoopMessage msg) {
        if (msg instanceof GridHadoopShuffleMessage) {
            GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg;
            try {
                this.job(m.jobId()).onShuffleMessage(m);
            }
            catch (GridException e) {
                U.error((GridLogger)this.log, (Object)"Message handling failed.", (Throwable)e);
            }
            try {
                this.send0(src, new GridHadoopShuffleAck(m.id(), m.jobId()));
            }
            catch (GridException e) {
                U.error((GridLogger)this.log, (Object)("Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']'), (Throwable)e);
            }
        } else if (msg instanceof GridHadoopShuffleAck) {
            GridHadoopShuffleAck m = (GridHadoopShuffleAck)msg;
            try {
                this.job(m.jobId()).onShuffleAck(m);
            }
            catch (GridException e) {
                U.error((GridLogger)this.log, (Object)"Message handling failed.", (Throwable)e);
            }
        } else {
            throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src + ", msg=" + msg + ']');
        }
        return true;
    }

    public GridHadoopTaskOutput output(GridHadoopTaskInfo taskInfo) throws GridException {
        return this.job(taskInfo.jobId()).output(taskInfo);
    }

    public GridHadoopTaskInput input(GridHadoopTaskInfo taskInfo) throws GridException {
        return this.job(taskInfo.jobId()).input(taskInfo);
    }

    public void jobFinished(GridHadoopJobId jobId) {
        GridHadoopShuffleJob job = (GridHadoopShuffleJob)this.jobs.remove(jobId);
        if (job != null) {
            try {
                job.close();
            }
            catch (GridException e) {
                U.error((GridLogger)this.log, (Object)("Failed to close job: " + jobId), (Throwable)e);
            }
        }
    }

    public GridFuture<?> flush(GridHadoopJobId jobId) {
        GridHadoopShuffleJob job = (GridHadoopShuffleJob)this.jobs.get(jobId);
        if (job == null) {
            return new GridFinishedFutureEx();
        }
        try {
            return job.flush();
        }
        catch (GridException e) {
            return new GridFinishedFutureEx((Throwable)e);
        }
    }

    public GridUnsafeMemory memory() {
        return this.mem;
    }
}

