/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.balancer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.balancer.KeyManager;
import org.apache.hadoop.hdfs.server.balancer.MovedBlocks;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
public class Dispatcher {
    static final Log LOG = LogFactory.getLog(Dispatcher.class);
    private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
    private static long delayAfterErrors = 10000L;
    private final NameNodeConnector nnc;
    private final SaslDataTransferClient saslClient;
    private final Set<String> excludedNodes;
    private final Set<String> includedNodes;
    private final Collection<Source> sources = new HashSet<Source>();
    private final Collection<DDatanode.StorageGroup> targets = new HashSet<DDatanode.StorageGroup>();
    private final GlobalBlockMap globalBlocks = new GlobalBlockMap();
    private final MovedBlocks<DDatanode.StorageGroup> movedBlocks;
    private final StorageGroupMap<DDatanode.StorageGroup> storageGroupMap = new StorageGroupMap();
    private NetworkTopology cluster;
    private final ExecutorService dispatchExecutor;
    private final Allocator moverThreadAllocator;
    private final int maxConcurrentMovesPerNode;
    private final long getBlocksSize;
    private final long getBlocksMinBlockSize;
    private final int ioFileBufferSize;
    private final boolean connectToDnViaHostname;
    private BlockPlacementPolicies placementPolicies;

    public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
        this(nnc, includedNodes, excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, 0L, 0L, conf);
    }

    Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) {
        this.nnc = nnc;
        this.excludedNodes = excludedNodes;
        this.includedNodes = includedNodes;
        this.movedBlocks = new MovedBlocks(movedWinWidth);
        this.cluster = NetworkTopology.getInstance((Configuration)conf);
        this.dispatchExecutor = dispatcherThreads == 0 ? null : Executors.newFixedThreadPool(dispatcherThreads);
        this.moverThreadAllocator = new Allocator(moverThreads);
        this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
        this.getBlocksSize = getBlocksSize;
        this.getBlocksMinBlockSize = getBlocksMinBlockSize;
        this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver((Configuration)conf), TrustedChannelResolver.getInstance((Configuration)conf), nnc.fallbackToSimpleAuth);
        this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize((Configuration)conf);
        this.connectToDnViaHostname = conf.getBoolean("dfs.client.use.datanode.hostname", false);
        this.placementPolicies = new BlockPlacementPolicies(conf, null, this.cluster, null);
    }

    public DistributedFileSystem getDistributedFileSystem() {
        return this.nnc.getDistributedFileSystem();
    }

    public StorageGroupMap<DDatanode.StorageGroup> getStorageGroupMap() {
        return this.storageGroupMap;
    }

    public NetworkTopology getCluster() {
        return this.cluster;
    }

    long getBytesMoved() {
        return this.nnc.getBytesMoved().get();
    }

    long bytesToMove() {
        Preconditions.checkState((this.storageGroupMap.size() >= this.sources.size() + this.targets.size() ? 1 : 0) != 0, (Object)("Mismatched number of storage groups (" + this.storageGroupMap.size() + " < " + this.sources.size() + " sources + " + this.targets.size() + " targets)"));
        long b = 0L;
        for (Source src : this.sources) {
            b += src.getScheduledSize();
        }
        return b;
    }

    void add(Source source, DDatanode.StorageGroup target) {
        this.sources.add(source);
        this.targets.add(target);
    }

    private boolean shouldIgnore(DatanodeInfo dn) {
        boolean notIncluded;
        boolean decommissioned = dn.isDecommissioned();
        boolean decommissioning = dn.isDecommissionInProgress();
        boolean excluded = Util.isExcluded(this.excludedNodes, dn);
        boolean bl = notIncluded = !Util.isIncluded(this.includedNodes, dn);
        if (decommissioned || decommissioning || excluded || notIncluded) {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Excluding datanode " + dn + ": decommissioned=" + decommissioned + ", decommissioning=" + decommissioning + ", excluded=" + excluded + ", notIncluded=" + notIncluded));
            }
            return true;
        }
        return false;
    }

    public List<DatanodeStorageReport> init() throws IOException {
        DatanodeStorageReport[] reports = this.nnc.getLiveDatanodeStorageReport();
        ArrayList<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>();
        for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
            DatanodeInfo datanode = r.getDatanodeInfo();
            if (this.shouldIgnore(datanode)) continue;
            trimmed.add(r);
            this.cluster.add((Node)datanode);
        }
        return trimmed;
    }

    public DDatanode newDatanode(DatanodeInfo datanode) {
        return new DDatanode(datanode, this.maxConcurrentMovesPerNode);
    }

    public void executePendingMove(final PendingMove p) {
        int nThreads;
        DDatanode targetDn = p.target.getDDatanode();
        ExecutorService moveExecutor = targetDn.getMoveExecutor();
        if (moveExecutor == null && (nThreads = this.moverThreadAllocator.allocate(this.maxConcurrentMovesPerNode)) > 0) {
            moveExecutor = targetDn.initMoveExecutor(nThreads);
        }
        if (moveExecutor == null) {
            LOG.warn((Object)("No mover threads available: skip moving " + p));
            return;
        }
        moveExecutor.execute(new Runnable(){

            @Override
            public void run() {
                p.dispatch();
            }
        });
    }

    public boolean dispatchAndCheckContinue() throws InterruptedException {
        return this.nnc.shouldContinue(this.dispatchBlockMoves());
    }

    private long dispatchBlockMoves() throws InterruptedException {
        long bytesLastMoved = this.getBytesMoved();
        Future[] futures = new Future[this.sources.size()];
        Iterator<Source> i = this.sources.iterator();
        for (int j = 0; j < futures.length; ++j) {
            final Source s = i.next();
            futures[j] = this.dispatchExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    s.dispatchBlocks();
                }
            });
        }
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (ExecutionException e) {
                LOG.warn((Object)"Dispatcher thread failed", e.getCause());
            }
        }
        Dispatcher.waitForMoveCompletion(this.targets);
        return this.getBytesMoved() - bytesLastMoved;
    }

    public static boolean waitForMoveCompletion(Iterable<? extends DDatanode.StorageGroup> targets) {
        boolean hasFailure = false;
        while (true) {
            boolean empty = true;
            for (DDatanode.StorageGroup storageGroup : targets) {
                if (!storageGroup.getDDatanode().isPendingQEmpty()) {
                    empty = false;
                    break;
                }
                hasFailure |= storageGroup.getDDatanode().hasFailure;
            }
            if (empty) {
                return hasFailure;
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
            }
        }
    }

    public static boolean checkForSuccess(Iterable<? extends DDatanode.StorageGroup> targets) {
        boolean hasSuccess = false;
        for (DDatanode.StorageGroup storageGroup : targets) {
            hasSuccess |= storageGroup.getDDatanode().hasSuccess;
        }
        return hasSuccess;
    }

    private boolean isGoodBlockCandidate(DDatanode.StorageGroup source, DDatanode.StorageGroup target, StorageType targetStorageType, DBlock block) {
        if (source.equals(target)) {
            return false;
        }
        if (target.storageType != targetStorageType) {
            return false;
        }
        if (this.movedBlocks.contains(block.getBlock())) {
            return false;
        }
        DatanodeInfo targetDatanode = target.getDatanodeInfo();
        if (source.getDatanodeInfo().equals((Object)targetDatanode)) {
            return true;
        }
        for (DDatanode.StorageGroup blockLocation : block.getLocations()) {
            if (!blockLocation.getDatanodeInfo().equals((Object)targetDatanode)) continue;
            return false;
        }
        return this.isGoodBlockCandidateForPlacementPolicy(source, target, block);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isGoodBlockCandidateForPlacementPolicy(DDatanode.StorageGroup source, DDatanode.StorageGroup target, DBlock block) {
        ArrayList<DatanodeInfo> datanodeInfos = new ArrayList<DatanodeInfo>();
        DBlock dBlock = block;
        synchronized (dBlock) {
            for (DDatanode.StorageGroup loc : block.locations) {
                datanodeInfos.add(loc.getDatanodeInfo());
            }
            datanodeInfos.add(target.getDatanodeInfo());
        }
        return this.placementPolicies.getPolicy(false).isMovable(datanodeInfos, source.getDatanodeInfo(), target.getDatanodeInfo());
    }

    void reset(Configuration conf) {
        this.cluster = NetworkTopology.getInstance((Configuration)conf);
        this.storageGroupMap.clear();
        this.sources.clear();
        this.moverThreadAllocator.reset();
        for (DDatanode.StorageGroup t : this.targets) {
            t.getDDatanode().shutdownMoveExecutor();
        }
        this.targets.clear();
        this.globalBlocks.removeAllButRetain(this.movedBlocks);
        this.movedBlocks.cleanup();
    }

    @VisibleForTesting
    public static void setDelayAfterErrors(long time) {
        delayAfterErrors = time;
    }

    public void shutdownNow() {
        if (this.dispatchExecutor != null) {
            this.dispatchExecutor.shutdownNow();
        }
    }

    static class Util {
        Util() {
        }

        static boolean isExcluded(Set<String> excludedNodes, DatanodeInfo dn) {
            return Util.isIn(excludedNodes, dn);
        }

        static boolean isIncluded(Set<String> includedNodes, DatanodeInfo dn) {
            return includedNodes.isEmpty() || Util.isIn(includedNodes, dn);
        }

        private static boolean isIn(Set<String> datanodes, DatanodeInfo dn) {
            return Util.isIn(datanodes, dn.getPeerHostName(), dn.getXferPort()) || Util.isIn(datanodes, dn.getIpAddr(), dn.getXferPort()) || Util.isIn(datanodes, dn.getHostName(), dn.getXferPort());
        }

        private static boolean isIn(Set<String> nodes, String host, int port) {
            if (host == null) {
                return false;
            }
            return nodes.contains(host) || nodes.contains(host + ":" + port);
        }
    }

    public class Source
    extends DDatanode.StorageGroup {
        private final List<Task> tasks;
        private long blocksToReceive;
        private final List<DBlock> srcBlocks;
        private static final long MAX_ITERATION_TIME = 1200000L;

        private Source(StorageType storageType, long maxSize2Move, DDatanode dn) {
            DDatanode dDatanode = dn;
            dDatanode.getClass();
            super(storageType, maxSize2Move);
            this.tasks = new ArrayList<Task>(2);
            this.blocksToReceive = 0L;
            this.srcBlocks = new ArrayList<DBlock>();
        }

        void addTask(Task task) {
            Preconditions.checkState((task.target != this ? 1 : 0) != 0, (Object)("Source and target are the same storage group " + this.getDisplayName()));
            this.incScheduledSize(task.size);
            this.tasks.add(task);
        }

        Iterator<DBlock> getBlockIterator() {
            return this.srcBlocks.iterator();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private long getBlockList() throws IOException {
            long size = Math.min(Dispatcher.this.getBlocksSize, this.blocksToReceive);
            BlocksWithLocations newBlksLocs = Dispatcher.this.nnc.getBlocks(this.getDatanodeInfo(), size);
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("getBlocks(" + this.getDatanodeInfo() + ", " + StringUtils.TraditionalBinaryPrefix.long2String((long)size, (String)"B", (int)2) + ") returns " + newBlksLocs.getBlocks().length + " blocks."));
            }
            long bytesReceived = 0L;
            for (BlocksWithLocations.BlockWithLocations blkLocs : newBlksLocs.getBlocks()) {
                DBlock block;
                if (blkLocs.getBlock().getNumBytes() < Dispatcher.this.getBlocksMinBlockSize) continue;
                if (blkLocs instanceof BlocksWithLocations.StripedBlockWithLocations) {
                    BlocksWithLocations.StripedBlockWithLocations sblkLocs = (BlocksWithLocations.StripedBlockWithLocations)blkLocs;
                    bytesReceived += sblkLocs.getBlock().getNumBytes() / (long)sblkLocs.getDataBlockNum();
                    block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(), sblkLocs.getDataBlockNum(), sblkLocs.getCellSize());
                } else {
                    bytesReceived += blkLocs.getBlock().getNumBytes();
                    block = new DBlock(blkLocs.getBlock());
                }
                GlobalBlockMap globalBlockMap = Dispatcher.this.globalBlocks;
                synchronized (globalBlockMap) {
                    DBlock dBlock = block = Dispatcher.this.globalBlocks.putIfAbsent(blkLocs.getBlock(), block);
                    synchronized (dBlock) {
                        block.clearLocations();
                        String[] datanodeUuids = blkLocs.getDatanodeUuids();
                        StorageType[] storageTypes = blkLocs.getStorageTypes();
                        for (int i = 0; i < datanodeUuids.length; ++i) {
                            Object g = Dispatcher.this.storageGroupMap.get(datanodeUuids[i], storageTypes[i]);
                            if (g == null) continue;
                            block.addLocation(g);
                        }
                    }
                    if (!this.srcBlocks.contains(block) && this.isGoodBlockCandidate(block)) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace((Object)("Add " + block + " to " + this));
                        }
                        this.srcBlocks.add(block);
                    }
                }
            }
            return bytesReceived;
        }

        private boolean isGoodBlockCandidate(DBlock block) {
            StorageType sourceStorageType = this.getStorageType();
            for (Task t : this.tasks) {
                if (!Dispatcher.this.isGoodBlockCandidate(this, t.target, sourceStorageType, block)) continue;
                return true;
            }
            return false;
        }

        private PendingMove chooseNextMove() {
            Iterator<Task> i = this.tasks.iterator();
            while (i.hasNext()) {
                PendingMove pendingBlock;
                Task task = i.next();
                DDatanode target = task.target.getDDatanode();
                if (!target.addPendingBlock(pendingBlock = new PendingMove(this, task.target))) continue;
                if (pendingBlock.chooseBlockAndProxy()) {
                    long blockSize = pendingBlock.reportedBlock.getNumBytes(this);
                    this.incScheduledSize(-blockSize);
                    task.size = task.size - blockSize;
                    if (task.size <= 0L) {
                        i.remove();
                    }
                    return pendingBlock;
                }
                target.removePendingBlock(pendingBlock);
            }
            return null;
        }

        public PendingMove addPendingMove(DBlock block, DDatanode.StorageGroup target) {
            return target.addPendingMove(block, new PendingMove(this, target));
        }

        private void removeMovedBlocks() {
            Iterator<DBlock> i = this.getBlockIterator();
            while (i.hasNext()) {
                if (!Dispatcher.this.movedBlocks.contains(i.next().getBlock())) continue;
                i.remove();
            }
        }

        private boolean shouldFetchMoreBlocks() {
            return this.blocksToReceive > 0L;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void dispatchBlocks() {
            long startTime = Time.monotonicNow();
            this.blocksToReceive = 2L * this.getScheduledSize();
            boolean isTimeUp = false;
            int noPendingMoveIteration = 0;
            while (!(isTimeUp || this.getScheduledSize() <= 0L || this.srcBlocks.isEmpty() && this.blocksToReceive <= 0L)) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)(this + " blocksToReceive=" + this.blocksToReceive + ", scheduledSize=" + this.getScheduledSize() + ", srcBlocks#=" + this.srcBlocks.size()));
                }
                if (Time.monotonicNow() - startTime > 1200000L) {
                    LOG.info((Object)("Time up (max time=1200 seconds).  Skipping " + this));
                    isTimeUp = true;
                    continue;
                }
                PendingMove p = this.chooseNextMove();
                if (p != null) {
                    noPendingMoveIteration = 0;
                    Dispatcher.this.executePendingMove(p);
                    continue;
                }
                this.removeMovedBlocks();
                if (this.shouldFetchMoreBlocks()) {
                    try {
                        long received = this.getBlockList();
                        if (received == 0L) {
                            return;
                        }
                        this.blocksToReceive -= received;
                        continue;
                    }
                    catch (IOException e) {
                        LOG.warn((Object)"Exception while getting reportedBlock list", (Throwable)e);
                        return;
                    }
                }
                if (++noPendingMoveIteration >= 5) {
                    LOG.info((Object)("Failed to find a pending move " + noPendingMoveIteration + " times.  Skipping " + this));
                    this.resetScheduledSize();
                }
                try {
                    Dispatcher dispatcher = Dispatcher.this;
                    synchronized (dispatcher) {
                        Dispatcher.this.wait(1000L);
                    }
                }
                catch (InterruptedException interruptedException) {
                }
            }
        }

        @Override
        public int hashCode() {
            return super.hashCode();
        }

        @Override
        public boolean equals(Object obj) {
            return super.equals(obj);
        }
    }

    public static class DDatanode {
        final DatanodeInfo datanode;
        private final EnumMap<StorageType, Source> sourceMap = new EnumMap(StorageType.class);
        private final EnumMap<StorageType, StorageGroup> targetMap = new EnumMap(StorageType.class);
        protected long delayUntil = 0L;
        private final List<PendingMove> pendings;
        private volatile boolean hasFailure = false;
        private volatile boolean hasSuccess = false;
        private ExecutorService moveExecutor;

        public String toString() {
            return this.getClass().getSimpleName() + ":" + this.datanode;
        }

        private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
            this.datanode = datanode;
            this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
        }

        public DatanodeInfo getDatanodeInfo() {
            return this.datanode;
        }

        synchronized ExecutorService initMoveExecutor(int poolSize) {
            this.moveExecutor = Executors.newFixedThreadPool(poolSize);
            return this.moveExecutor;
        }

        synchronized ExecutorService getMoveExecutor() {
            return this.moveExecutor;
        }

        synchronized void shutdownMoveExecutor() {
            if (this.moveExecutor != null) {
                this.moveExecutor.shutdown();
                this.moveExecutor = null;
            }
        }

        private static <G extends StorageGroup> void put(StorageType storageType, G g, EnumMap<StorageType, G> map) {
            StorageGroup existing = (StorageGroup)map.put(storageType, g);
            Preconditions.checkState((existing == null ? 1 : 0) != 0);
        }

        public StorageGroup addTarget(StorageType storageType, long maxSize2Move) {
            StorageGroup g = new StorageGroup(storageType, maxSize2Move);
            DDatanode.put(storageType, g, this.targetMap);
            return g;
        }

        public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) {
            Dispatcher dispatcher = d;
            dispatcher.getClass();
            Source s = dispatcher.new Source(storageType, maxSize2Move, this);
            DDatanode.put(storageType, s, this.sourceMap);
            return s;
        }

        private synchronized void activateDelay(long delta) {
            this.delayUntil = Time.monotonicNow() + delta;
            LOG.info((Object)(this + " activateDelay " + (double)delta / 1000.0 + " seconds"));
        }

        private synchronized boolean isDelayActive() {
            if (this.delayUntil == 0L || Time.monotonicNow() > this.delayUntil) {
                this.delayUntil = 0L;
                return false;
            }
            return true;
        }

        synchronized boolean isPendingQEmpty() {
            return this.pendings.isEmpty();
        }

        synchronized boolean addPendingBlock(PendingMove pendingBlock) {
            if (!this.isDelayActive()) {
                return this.pendings.add(pendingBlock);
            }
            return false;
        }

        synchronized boolean removePendingBlock(PendingMove pendingBlock) {
            return this.pendings.remove(pendingBlock);
        }

        void setHasFailure() {
            this.hasFailure = true;
        }

        void setHasSuccess() {
            this.hasSuccess = true;
        }

        public class StorageGroup {
            final StorageType storageType;
            final long maxSize2Move;
            private long scheduledSize = 0L;

            private StorageGroup(StorageType storageType, long maxSize2Move) {
                this.storageType = storageType;
                this.maxSize2Move = maxSize2Move;
            }

            public StorageType getStorageType() {
                return this.storageType;
            }

            private DDatanode getDDatanode() {
                return DDatanode.this;
            }

            public DatanodeInfo getDatanodeInfo() {
                return DDatanode.this.datanode;
            }

            boolean hasSpaceForScheduling() {
                return this.hasSpaceForScheduling(0L);
            }

            synchronized boolean hasSpaceForScheduling(long size) {
                return this.availableSizeToMove() > size;
            }

            synchronized long availableSizeToMove() {
                return this.maxSize2Move - this.scheduledSize;
            }

            public synchronized void incScheduledSize(long size) {
                this.scheduledSize += size;
            }

            synchronized long getScheduledSize() {
                return this.scheduledSize;
            }

            synchronized void resetScheduledSize() {
                this.scheduledSize = 0L;
            }

            private PendingMove addPendingMove(DBlock block, PendingMove pm) {
                if (this.getDDatanode().addPendingBlock(pm)) {
                    if (pm.markMovedIfGoodBlock(block, this.getStorageType())) {
                        this.incScheduledSize(pm.reportedBlock.getNumBytes());
                        return pm;
                    }
                    this.getDDatanode().removePendingBlock(pm);
                }
                return null;
            }

            String getDisplayName() {
                return DDatanode.this.datanode + ":" + this.storageType;
            }

            public String toString() {
                return this.getDisplayName();
            }

            public int hashCode() {
                return this.getStorageType().hashCode() ^ this.getDatanodeInfo().hashCode();
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || !(obj instanceof StorageGroup)) {
                    return false;
                }
                StorageGroup that = (StorageGroup)obj;
                return this.getStorageType() == that.getStorageType() && this.getDatanodeInfo().equals((Object)that.getDatanodeInfo());
            }
        }
    }

    static class Task {
        private final DDatanode.StorageGroup target;
        private long size;

        Task(DDatanode.StorageGroup target, long size) {
            this.target = target;
            this.size = size;
        }

        long getSize() {
            return this.size;
        }
    }

    public static class DBlockStriped
    extends DBlock {
        final byte[] indices;
        final short dataBlockNum;
        final int cellSize;

        public DBlockStriped(Block block, byte[] indices, short dataBlockNum, int cellSize) {
            super(block);
            this.indices = indices;
            this.dataBlockNum = dataBlockNum;
            this.cellSize = cellSize;
        }

        public DBlock getInternalBlock(DDatanode.StorageGroup storage) {
            int idxInLocs = this.locations.indexOf(storage);
            if (idxInLocs == -1) {
                return null;
            }
            byte idxInGroup = this.indices[idxInLocs];
            long blkId = this.getBlock().getBlockId() + (long)idxInGroup;
            long numBytes = StripedBlockUtil.getInternalBlockLength((long)this.getNumBytes(), (int)this.cellSize, (int)this.dataBlockNum, (int)idxInGroup);
            Block blk = new Block(this.getBlock());
            blk.setBlockId(blkId);
            blk.setNumBytes(numBytes);
            DBlock dblk = new DBlock(blk);
            dblk.addLocation(storage);
            return dblk;
        }

        @Override
        public long getNumBytes(DDatanode.StorageGroup storage) {
            return this.getInternalBlock(storage).getNumBytes();
        }
    }

    public static class DBlock
    extends MovedBlocks.Locations<DDatanode.StorageGroup> {
        public DBlock(Block block) {
            super(block);
        }

        public long getNumBytes(DDatanode.StorageGroup storage) {
            return super.getNumBytes();
        }
    }

    public class PendingMove {
        private DBlock reportedBlock;
        private Source source;
        private DDatanode proxySource;
        private DDatanode.StorageGroup target;

        private PendingMove(Source source, DDatanode.StorageGroup target) {
            this.source = source;
            this.target = target;
        }

        public String toString() {
            Block b = this.reportedBlock != null ? this.reportedBlock.getBlock() : null;
            String bStr = b != null ? b + " with size=" + b.getNumBytes() + " " : " ";
            return bStr + "from " + this.source.getDisplayName() + " to " + this.target.getDisplayName() + " through " + (this.proxySource != null ? this.proxySource.datanode : "");
        }

        private boolean chooseBlockAndProxy() {
            StorageType t = this.source.getStorageType();
            Iterator<DBlock> i = this.source.getBlockIterator();
            while (i.hasNext()) {
                if (!this.markMovedIfGoodBlock(i.next(), t)) continue;
                i.remove();
                return true;
            }
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) {
            DBlock dBlock = block;
            synchronized (dBlock) {
                MovedBlocks movedBlocks = Dispatcher.this.movedBlocks;
                synchronized (movedBlocks) {
                    if (Dispatcher.this.isGoodBlockCandidate(this.source, this.target, targetStorageType, block)) {
                        this.reportedBlock = block instanceof DBlockStriped ? ((DBlockStriped)block).getInternalBlock(this.source) : block;
                        if (this.chooseProxySource()) {
                            Dispatcher.this.movedBlocks.put(block);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug((Object)("Decided to move " + this));
                            }
                            return true;
                        }
                    }
                }
            }
            return false;
        }

        private boolean chooseProxySource() {
            DatanodeInfo targetDN = this.target.getDatanodeInfo();
            if (this.source.getDatanodeInfo().equals((Object)targetDN) && this.addTo(this.source)) {
                return true;
            }
            if (Dispatcher.this.cluster.isNodeGroupAware()) {
                for (DDatanode.StorageGroup loc : this.reportedBlock.getLocations()) {
                    if (!Dispatcher.this.cluster.isOnSameNodeGroup((Node)loc.getDatanodeInfo(), (Node)targetDN) || !this.addTo(loc)) continue;
                    return true;
                }
            }
            for (DDatanode.StorageGroup loc : this.reportedBlock.getLocations()) {
                if (!Dispatcher.this.cluster.isOnSameRack((Node)loc.getDatanodeInfo(), (Node)targetDN) || !this.addTo(loc)) continue;
                return true;
            }
            for (DDatanode.StorageGroup loc : this.reportedBlock.getLocations()) {
                if (!this.addTo(loc)) continue;
                return true;
            }
            return false;
        }

        private boolean addTo(DDatanode.StorageGroup g) {
            DDatanode dn = g.getDDatanode();
            if (dn.addPendingBlock(this)) {
                this.proxySource = dn;
                return true;
            }
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void dispatch() {
            block24: {
                Object unbufOut;
                LOG.info((Object)("Start moving " + this));
                assert (!(this.reportedBlock instanceof DBlockStriped));
                Socket sock = new Socket();
                DataOutputStream out = null;
                DataInputStream in = null;
                try {
                    sock.connect(NetUtils.createSocketAddr((String)this.target.getDatanodeInfo().getXferAddr(Dispatcher.this.connectToDnViaHostname)), 60000);
                    sock.setKeepAlive(true);
                    unbufOut = sock.getOutputStream();
                    InputStream unbufIn = sock.getInputStream();
                    ExtendedBlock eb = new ExtendedBlock(Dispatcher.this.nnc.getBlockpoolID(), this.reportedBlock.getBlock());
                    KeyManager km = Dispatcher.this.nnc.getKeyManager();
                    Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
                    IOStreamPair saslStreams = Dispatcher.this.saslClient.socketSend(sock, (OutputStream)unbufOut, unbufIn, (DataEncryptionKeyFactory)km, accessToken, (DatanodeID)this.target.getDatanodeInfo());
                    unbufOut = saslStreams.out;
                    unbufIn = saslStreams.in;
                    out = new DataOutputStream(new BufferedOutputStream((OutputStream)unbufOut, Dispatcher.this.ioFileBufferSize));
                    in = new DataInputStream(new BufferedInputStream(unbufIn, Dispatcher.this.ioFileBufferSize));
                    this.sendRequest(out, eb, accessToken);
                    this.receiveResponse(in);
                    Dispatcher.this.nnc.getBytesMoved().addAndGet(this.reportedBlock.getNumBytes());
                    this.target.getDDatanode().setHasSuccess();
                    LOG.info((Object)("Successfully moved " + this));
                    IOUtils.closeStream((Closeable)out);
                }
                catch (IOException e) {
                    LOG.warn((Object)("Failed to move " + this + ": " + e.getMessage()));
                    this.target.getDDatanode().setHasFailure();
                    this.proxySource.activateDelay(delayAfterErrors);
                    this.target.getDDatanode().activateDelay(delayAfterErrors);
                    break block24;
                }
                finally {
                    IOUtils.closeStream(out);
                    IOUtils.closeStream(in);
                    IOUtils.closeSocket((Socket)sock);
                    this.proxySource.removePendingBlock(this);
                    this.target.getDDatanode().removePendingBlock(this);
                    Object object = this;
                    synchronized (object) {
                        this.reset();
                    }
                    object = Dispatcher.this;
                    synchronized (object) {
                        Dispatcher.this.notifyAll();
                    }
                }
                IOUtils.closeStream((Closeable)in);
                IOUtils.closeSocket((Socket)sock);
                this.proxySource.removePendingBlock(this);
                this.target.getDDatanode().removePendingBlock(this);
                unbufOut = this;
                synchronized (unbufOut) {
                    this.reset();
                }
                unbufOut = Dispatcher.this;
                synchronized (unbufOut) {
                    Dispatcher.this.notifyAll();
                }
            }
        }

        private void sendRequest(DataOutputStream out, ExtendedBlock eb, Token<BlockTokenIdentifier> accessToken) throws IOException {
            new Sender(out).replaceBlock(eb, this.target.storageType, accessToken, this.source.getDatanodeInfo().getDatanodeUuid(), this.proxySource.datanode);
        }

        private void receiveResponse(DataInputStream in) throws IOException {
            DataTransferProtos.BlockOpResponseProto response = DataTransferProtos.BlockOpResponseProto.parseFrom((InputStream)PBHelperClient.vintPrefixed((InputStream)in));
            while (response.getStatus() == DataTransferProtos.Status.IN_PROGRESS) {
                response = DataTransferProtos.BlockOpResponseProto.parseFrom((InputStream)PBHelperClient.vintPrefixed((InputStream)in));
            }
            String logInfo = "reportedBlock move is failed";
            DataTransferProtoUtil.checkBlockOpStatus((DataTransferProtos.BlockOpResponseProto)response, (String)logInfo);
        }

        private void reset() {
            this.reportedBlock = null;
            this.source = null;
            this.proxySource = null;
            this.target = null;
        }
    }

    public static class StorageGroupMap<G extends DDatanode.StorageGroup> {
        private final Map<String, G> map = new HashMap<String, G>();

        private static String toKey(String datanodeUuid, StorageType storageType) {
            return datanodeUuid + ":" + storageType;
        }

        public G get(String datanodeUuid, StorageType storageType) {
            return (G)((DDatanode.StorageGroup)this.map.get(StorageGroupMap.toKey(datanodeUuid, storageType)));
        }

        public void put(G g) {
            String key = StorageGroupMap.toKey(((DDatanode.StorageGroup)g).getDatanodeInfo().getDatanodeUuid(), ((DDatanode.StorageGroup)g).storageType);
            DDatanode.StorageGroup existing = (DDatanode.StorageGroup)this.map.put(key, g);
            Preconditions.checkState((existing == null ? 1 : 0) != 0);
        }

        int size() {
            return this.map.size();
        }

        void clear() {
            this.map.clear();
        }

        public Collection<G> values() {
            return this.map.values();
        }
    }

    private static class GlobalBlockMap {
        private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();

        private GlobalBlockMap() {
        }

        private DBlock putIfAbsent(Block blk, DBlock dblk) {
            if (!this.map.containsKey(blk)) {
                this.map.put(blk, dblk);
                return dblk;
            }
            return this.map.get(blk);
        }

        private void removeAllButRetain(MovedBlocks<DDatanode.StorageGroup> movedBlocks) {
            Iterator<Block> i = this.map.keySet().iterator();
            while (i.hasNext()) {
                if (movedBlocks.contains(i.next())) continue;
                i.remove();
            }
        }
    }

    static class Allocator {
        private final int max;
        private int count = 0;

        Allocator(int max) {
            this.max = max;
        }

        synchronized int allocate(int n) {
            int remaining = this.max - this.count;
            if (remaining <= 0) {
                return 0;
            }
            int allocated = remaining < n ? remaining : n;
            this.count += allocated;
            return allocated;
        }

        synchronized void reset() {
            this.count = 0;
        }
    }
}

