/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.processors.hadoop.planner;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.ListIterator;
import java.util.Map;
import java.util.UUID;
import org.gridgain.grid.Grid;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridNode;
import org.gridgain.grid.ggfs.GridGgfsBlockLocation;
import org.gridgain.grid.ggfs.GridGgfsPath;
import org.gridgain.grid.hadoop.GridHadoopDefaultJobInfo;
import org.gridgain.grid.hadoop.GridHadoopFileBlock;
import org.gridgain.grid.hadoop.GridHadoopInputSplit;
import org.gridgain.grid.hadoop.GridHadoopJob;
import org.gridgain.grid.hadoop.GridHadoopMapReducePlan;
import org.gridgain.grid.hadoop.GridHadoopMapReducePlanner;
import org.gridgain.grid.kernal.GridEx;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopEndpoint;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsEx;
import org.gridgain.grid.kernal.processors.hadoop.planner.GridHadoopDefaultMapReducePlan;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.resources.GridInstanceResource;
import org.gridgain.grid.resources.GridLoggerResource;
import org.gridgain.grid.util.typedef.F;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class GridHadoopDefaultMapReducePlanner
implements GridHadoopMapReducePlanner {
    @GridInstanceResource
    private Grid grid;
    @GridLoggerResource
    private GridLogger log;

    public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<GridNode> top, @Nullable GridHadoopMapReducePlan oldPlan) throws GridException {
        HashSet<UUID> topIds = new HashSet<UUID>(top.size(), 1.0f);
        for (GridNode topNode : top) {
            topIds.add(topNode.id());
        }
        Map<UUID, Collection<GridHadoopInputSplit>> mappers = this.mappers(top, topIds, job.input());
        int rdcCnt = 0;
        if (job.info().hasReducer()) {
            rdcCnt = 1;
            if (job.info() instanceof GridHadoopDefaultJobInfo && (rdcCnt = ((GridHadoopDefaultJobInfo)job.info()).reducers()) < 1) {
                throw new GridException("Number of reducers must be positive, actual: " + rdcCnt);
            }
        }
        Map<UUID, int[]> reducers = this.reducers(top, mappers, rdcCnt);
        return new GridHadoopDefaultMapReducePlan(mappers, reducers);
    }

    private Map<UUID, Collection<GridHadoopInputSplit>> mappers(Collection<GridNode> top, Collection<UUID> topIds, Iterable<GridHadoopInputSplit> splits) throws GridException {
        HashMap<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<UUID, Collection<GridHadoopInputSplit>>();
        Map<String, Collection<UUID>> nodes = GridHadoopDefaultMapReducePlanner.hosts(top);
        HashMap<UUID, Integer> nodeLoads = new HashMap<UUID, Integer>(top.size(), 1.0f);
        for (UUID nodeId : topIds) {
            nodeLoads.put(nodeId, 0);
        }
        for (GridHadoopInputSplit split : splits) {
            ArrayList<GridHadoopInputSplit> nodeSplits;
            UUID nodeId = this.nodeForSplit(split, topIds, nodes, nodeLoads);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']');
            }
            if ((nodeSplits = (ArrayList<GridHadoopInputSplit>)mappers.get(nodeId)) == null) {
                nodeSplits = new ArrayList<GridHadoopInputSplit>();
                mappers.put(nodeId, nodeSplits);
            }
            nodeSplits.add(split);
            nodeLoads.put(nodeId, (Integer)nodeLoads.get(nodeId) + 1);
        }
        return mappers;
    }

    private static Map<String, Collection<UUID>> hosts(Collection<GridNode> top) {
        HashMap<String, Collection<UUID>> grouped = new HashMap<String, Collection<UUID>>(top.size());
        for (GridNode node : top) {
            for (String host : node.hostNames()) {
                ArrayList<UUID> nodeIds = (ArrayList<UUID>)grouped.get(host);
                if (nodeIds == null) {
                    nodeIds = new ArrayList<UUID>(2);
                    grouped.put(host, nodeIds);
                }
                nodeIds.add(node.id());
            }
        }
        return grouped;
    }

    private UUID nodeForSplit(GridHadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, Map<UUID, Integer> nodeLoads) throws GridException {
        GridHadoopFileBlock split0;
        if (split instanceof GridHadoopFileBlock && "ggfs".equalsIgnoreCase((split0 = (GridHadoopFileBlock)split).file().getScheme())) {
            GridGgfsHadoopEndpoint endpoint = new GridGgfsHadoopEndpoint(split0.file().getAuthority());
            GridGgfsEx ggfs = null;
            if (F.eq((Object)this.grid.name(), (Object)endpoint.grid())) {
                ggfs = (GridGgfsEx)((GridEx)this.grid).ggfsx(endpoint.ggfs());
            }
            if (ggfs != null && !ggfs.isProxy(split0.file())) {
                Collection blocks = ggfs.affinity(new GridGgfsPath(split0.file()), split0.start(), split0.length());
                assert (blocks != null);
                if (blocks.size() == 1) {
                    return this.bestNode(((GridGgfsBlockLocation)blocks.iterator().next()).nodeIds(), topIds, nodeLoads, false);
                }
                HashMap<UUID, Long> nodeMap = new HashMap<UUID, Long>();
                ArrayList<UUID> bestNodeIds = null;
                long bestLen = -1L;
                for (GridGgfsBlockLocation block : blocks) {
                    for (UUID blockNodeId : block.nodeIds()) {
                        if (!topIds.contains(blockNodeId)) continue;
                        Long oldLen = (Long)nodeMap.get(blockNodeId);
                        long newLen = oldLen == null ? block.length() : oldLen + block.length();
                        nodeMap.put(blockNodeId, newLen);
                        if (bestNodeIds == null || bestLen < newLen) {
                            bestNodeIds = new ArrayList<UUID>(1);
                            bestNodeIds.add(blockNodeId);
                            bestLen = newLen;
                            continue;
                        }
                        if (bestLen != newLen) continue;
                        assert (!F.isEmpty(bestNodeIds));
                        bestNodeIds.add(blockNodeId);
                    }
                }
                if (bestNodeIds != null) {
                    return bestNodeIds.size() == 1 ? (UUID)bestNodeIds.get(0) : this.bestNode((Collection<UUID>)bestNodeIds, topIds, nodeLoads, true);
                }
            }
        }
        ArrayList<UUID> blockNodes = null;
        for (String host : split.hosts()) {
            Collection<UUID> hostNodes = nodes.get(host);
            if (F.isEmpty(hostNodes)) continue;
            if (blockNodes == null) {
                blockNodes = new ArrayList<UUID>(hostNodes);
                continue;
            }
            blockNodes.addAll(hostNodes);
        }
        return this.bestNode(blockNodes, topIds, nodeLoads, false);
    }

    private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads, boolean skipTopCheck) {
        int load;
        UUID bestNode = null;
        int bestLoad = Integer.MAX_VALUE;
        if (candidates != null) {
            for (UUID candidate : candidates) {
                if (!skipTopCheck && !topIds.contains(candidate)) continue;
                load = nodeLoads.get(candidate);
                if (bestNode != null && bestLoad <= load) continue;
                bestNode = candidate;
                bestLoad = load;
                if (bestLoad != 0) continue;
                break;
            }
        }
        if (bestNode == null) {
            bestLoad = Integer.MAX_VALUE;
            for (UUID nodeId : topIds) {
                load = nodeLoads.get(nodeId);
                if (bestNode != null && bestLoad <= load) continue;
                bestNode = nodeId;
                bestLoad = load;
                if (bestLoad != 0) continue;
                break;
            }
        }
        assert (bestNode != null);
        return bestNode;
    }

    private Map<UUID, int[]> reducers(Collection<GridNode> top, Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) throws GridException {
        ListIterator iter;
        WeightedNode node3;
        int totalWeight = 0;
        ArrayList<WeightedNode> nodes = new ArrayList<WeightedNode>(top.size());
        Iterator<GridNode> i$ = top.iterator();
        while (i$.hasNext()) {
            GridNode node2;
            Collection<GridHadoopInputSplit> split = mappers.get((node2 = i$.next()).id());
            int weight = this.reducerNodeWeight(node2, split != null ? split.size() : 0);
            nodes.add(new WeightedNode(node2.id(), weight, weight));
            totalWeight += weight;
        }
        int totalAdjustedWeight = 0;
        for (WeightedNode node3 : nodes) {
            node3.floatWeight = (float)node3.weight * (float)reducerCnt / (float)totalWeight;
            node3.weight = Math.round(node3.floatWeight);
            totalAdjustedWeight += node3.weight;
        }
        Collections.sort(nodes);
        if (totalAdjustedWeight > reducerCnt) {
            iter = nodes.listIterator(nodes.size() - 1);
            while (totalAdjustedWeight != reducerCnt) {
                if (!iter.hasPrevious()) {
                    iter = nodes.listIterator(nodes.size() - 1);
                }
                if ((node3 = (WeightedNode)iter.previous()).weight <= 0) continue;
                node3.weight -= 1;
                --totalAdjustedWeight;
            }
        } else if (totalAdjustedWeight < reducerCnt) {
            iter = nodes.listIterator(0);
            while (totalAdjustedWeight != reducerCnt) {
                if (!iter.hasNext()) {
                    iter = nodes.listIterator(0);
                }
                if (!((node3 = (WeightedNode)iter.next()).floatWeight > 0.0f)) continue;
                node3.weight += 1;
                ++totalAdjustedWeight;
            }
        }
        int idx = 0;
        HashMap<UUID, int[]> reducers = new HashMap<UUID, int[]>(nodes.size(), 1.0f);
        for (WeightedNode node4 : nodes) {
            if (node4.weight <= 0) continue;
            int[] arr = new int[node4.weight];
            for (int i = 0; i < arr.length; ++i) {
                arr[i] = idx++;
            }
            reducers.put(node4.nodeId, arr);
        }
        return reducers;
    }

    protected int reducerNodeWeight(GridNode node, int splitCnt) {
        return splitCnt;
    }

    private static class WeightedNode
    implements Comparable<WeightedNode> {
        private final UUID nodeId;
        private int weight;
        private float floatWeight;

        private WeightedNode(UUID nodeId, int weight, float floatWeight) {
            this.nodeId = nodeId;
            this.weight = weight;
            this.floatWeight = floatWeight;
        }

        public boolean equals(Object obj) {
            return obj != null && obj instanceof WeightedNode && F.eq((Object)this.nodeId, (Object)((WeightedNode)obj).nodeId);
        }

        public int hashCode() {
            return this.nodeId.hashCode();
        }

        @Override
        public int compareTo(@NotNull WeightedNode other) {
            float res = other.floatWeight - this.floatWeight;
            return res > 0.0f ? 1 : (res < 0.0f ? -1 : this.nodeId.compareTo(other.nodeId));
        }
    }
}

