/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminMonitorBase;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminMonitorInterface;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatanodeAdminBackoffMonitor
extends DatanodeAdminMonitorBase
implements DatanodeAdminMonitorInterface {
    private HashMap<DatanodeDescriptor, HashMap<BlockInfo, Integer>> outOfServiceNodeBlocks = new HashMap();
    private final Queue<DatanodeDescriptor> cancelledNodes = new ArrayDeque<DatanodeDescriptor>();
    private int blocksPerLock;
    private int numBlocksChecked = 0;
    private int pendingRepLimit;
    private final Map<DatanodeDescriptor, List<BlockInfo>> pendingRep = new HashMap<DatanodeDescriptor, List<BlockInfo>>();
    private static final Logger LOG = LoggerFactory.getLogger(DatanodeAdminBackoffMonitor.class);

    DatanodeAdminBackoffMonitor() {
    }

    @Override
    protected void processConf() {
        this.pendingRepLimit = this.conf.getInt("dfs.namenode.decommission.backoff.monitor.pending.limit", 10000);
        if (this.pendingRepLimit < 1) {
            LOG.error("{} is set to an invalid value, it must be greater than zero. Defaulting to {}", (Object)"dfs.namenode.decommission.backoff.monitor.pending.limit", (Object)10000);
            this.pendingRepLimit = 10000;
        }
        this.blocksPerLock = this.conf.getInt("dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock", 1000);
        if (this.blocksPerLock <= 0) {
            LOG.error("{} is set to an invalid value, it must be greater than zero. Defaulting to {}", (Object)"dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock", (Object)1000);
            this.blocksPerLock = 1000;
        }
        LOG.info("Initialized the Backoff Decommission and Maintenance Monitor");
    }

    @Override
    public void stopTrackingNode(DatanodeDescriptor dn) {
        this.pendingNodes.remove((Object)dn);
        this.cancelledNodes.add(dn);
    }

    @Override
    public int getTrackedNodeCount() {
        return this.outOfServiceNodeBlocks.size();
    }

    @Override
    public int getNumNodesChecked() {
        return this.outOfServiceNodeBlocks.size();
    }

    @Override
    public void run() {
        LOG.debug("DatanodeAdminMonitorV2 is running.");
        if (!this.namesystem.isRunning()) {
            LOG.info("Namesystem is not running, skipping decommissioning/maintenance checks.");
            return;
        }
        this.numBlocksChecked = 0;
        try {
            this.namesystem.writeLock();
            try {
                this.processCancelledNodes();
                this.processPendingNodes();
            }
            finally {
                this.namesystem.writeUnlock();
            }
            this.check();
        }
        catch (Exception e) {
            LOG.warn("DatanodeAdminMonitor caught exception when processing node.", (Throwable)e);
        }
        if (this.numBlocksChecked + this.outOfServiceNodeBlocks.size() > 0) {
            LOG.info("Checked {} blocks this tick. {} nodes are now in maintenance or transitioning state. {} nodes pending. {} nodes waiting to be cancelled.", new Object[]{this.numBlocksChecked, this.outOfServiceNodeBlocks.size(), this.pendingNodes.size(), this.cancelledNodes.size()});
        }
    }

    private void processPendingNodes() {
        while (!(this.pendingNodes.isEmpty() || this.maxConcurrentTrackedNodes != 0 && this.outOfServiceNodeBlocks.size() >= this.maxConcurrentTrackedNodes)) {
            this.outOfServiceNodeBlocks.put((DatanodeDescriptor)((Object)this.pendingNodes.poll()), (HashMap<BlockInfo, Integer>)null);
        }
    }

    private void processCancelledNodes() {
        while (!this.cancelledNodes.isEmpty()) {
            DatanodeDescriptor dn = this.cancelledNodes.poll();
            this.outOfServiceNodeBlocks.remove((Object)dn);
            this.pendingRep.remove((Object)dn);
        }
    }

    private void check() {
        ArrayList<DatanodeDescriptor> toRemove = new ArrayList<DatanodeDescriptor>();
        if (this.outOfServiceNodeBlocks.size() == 0) {
            return;
        }
        this.outOfServiceNodeBlocks.keySet().stream().filter(n -> this.outOfServiceNodeBlocks.get(n) == null).forEach(n -> this.scanDatanodeStorage((DatanodeDescriptor)((Object)n), true));
        this.processMaintenanceNodes();
        this.processPendingReplication();
        this.moveBlocksToPending();
        this.checkForCompletedNodes(toRemove);
        this.processCompletedNodes(toRemove);
    }

    private void processMaintenanceNodes() {
        this.namesystem.writeLock();
        try {
            for (DatanodeDescriptor dn : this.outOfServiceNodeBlocks.keySet()) {
                if (!dn.isMaintenance() || !dn.maintenanceExpired()) continue;
                this.dnAdmin.stopMaintenance(dn);
                this.namesystem.writeUnlock();
                this.namesystem.writeLock();
            }
        }
        finally {
            this.namesystem.writeUnlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processCompletedNodes(List<DatanodeDescriptor> toRemove) {
        if (toRemove.size() == 0) {
            return;
        }
        this.namesystem.writeLock();
        try {
            for (DatanodeDescriptor dn : toRemove) {
                boolean isHealthy = this.blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
                if (isHealthy) {
                    if (dn.isDecommissionInProgress()) {
                        this.dnAdmin.setDecommissioned(dn);
                        this.outOfServiceNodeBlocks.remove((Object)dn);
                        this.pendingRep.remove((Object)dn);
                    } else if (dn.isEnteringMaintenance()) {
                        this.dnAdmin.setInMaintenance(dn);
                        this.pendingRep.remove((Object)dn);
                    } else {
                        if (dn.isInService()) {
                            LOG.info("Node {} completed decommission and maintenance but has been moved back to in service", (Object)dn);
                            this.pendingRep.remove((Object)dn);
                            this.outOfServiceNodeBlocks.remove((Object)dn);
                            continue;
                        }
                        LOG.error("Node {} is in an unexpected state {} and has been removed from tracking for decommission or maintenance", (Object)dn, (Object)dn.getAdminState());
                        this.pendingRep.remove((Object)dn);
                        this.outOfServiceNodeBlocks.remove((Object)dn);
                        continue;
                    }
                    LOG.info("Node {} is sufficiently replicated and healthy, marked as {}.", (Object)dn, (Object)dn.getAdminState());
                    continue;
                }
                LOG.info("Node {} isn't healthy. It needs to replicate {} more blocks. {} is still in progress.", new Object[]{dn, this.getPendingCountForNode(dn), dn.getAdminState()});
            }
        }
        finally {
            this.namesystem.writeUnlock();
        }
    }

    private void checkForCompletedNodes(List<DatanodeDescriptor> removeList) {
        for (DatanodeDescriptor dn : this.outOfServiceNodeBlocks.keySet()) {
            if (dn.isInMaintenance()) {
                LOG.debug("Node {} is currently in maintenance", (Object)dn);
                continue;
            }
            if (dn.isInService()) continue;
            int outstandingBlocks = this.getPendingCountForNode(dn);
            if (outstandingBlocks == 0) {
                this.scanDatanodeStorage(dn, false);
                outstandingBlocks = this.getPendingCountForNode(dn);
            }
            LOG.info("Node {} has {} blocks yet to process", (Object)dn, (Object)outstandingBlocks);
            if (outstandingBlocks != 0) continue;
            removeList.add(dn);
        }
    }

    private int getPendingCountForNode(DatanodeDescriptor dn) {
        List<BlockInfo> pendingBlocks;
        int count = 0;
        HashMap<BlockInfo, Integer> blocks = this.outOfServiceNodeBlocks.get((Object)dn);
        if (blocks != null) {
            count += blocks.size();
        }
        if ((pendingBlocks = this.pendingRep.get((Object)dn)) != null) {
            count += pendingBlocks.size();
        }
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void moveBlocksToPending() {
        int blocksProcessed = 0;
        int pendingCount = this.getPendingCount();
        int yetToBeProcessed = this.getYetToBeProcessedCount();
        if (pendingCount == 0 && yetToBeProcessed == 0) {
            LOG.debug("There are no pending or blocks yet to be processed");
            return;
        }
        this.namesystem.writeLock();
        try {
            long repQueueSize = this.blockManager.getLowRedundancyBlocksCount();
            LOG.info("There are {} blocks pending replication and the limit is {}. A further {} blocks are waiting to be processed. The replication queue currently has {} blocks", new Object[]{pendingCount, this.pendingRepLimit, yetToBeProcessed, repQueueSize});
            if (pendingCount >= this.pendingRepLimit) {
                return;
            }
            HashMap<DatanodeDescriptor, Iterator<BlockInfo>> iterators = new HashMap<DatanodeDescriptor, Iterator<BlockInfo>>();
            for (Map.Entry<DatanodeDescriptor, HashMap<BlockInfo, Integer>> e : this.outOfServiceNodeBlocks.entrySet()) {
                iterators.put(e.getKey(), e.getValue().keySet().iterator());
            }
            Iterator nodeIter = Iterables.cycle(iterators.keySet()).iterator();
            while (nodeIter.hasNext()) {
                DatanodeDescriptor dn = (DatanodeDescriptor)((Object)nodeIter.next());
                Iterator blockIt = (Iterator)iterators.get((Object)dn);
                while (blockIt.hasNext()) {
                    if (blocksProcessed >= this.blocksPerLock) {
                        blocksProcessed = 0;
                        this.namesystem.writeUnlock();
                        this.namesystem.writeLock();
                    }
                    ++blocksProcessed;
                    if (!this.nextBlockAddedToPending(blockIt, dn)) continue;
                    ++pendingCount;
                    break;
                }
                if (!blockIt.hasNext()) {
                    nodeIter.remove();
                }
                if (pendingCount < this.pendingRepLimit) continue;
                break;
            }
        }
        finally {
            this.namesystem.writeUnlock();
        }
        LOG.debug("{} blocks are now pending replication", (Object)pendingCount);
    }

    private boolean nextBlockAddedToPending(Iterator<BlockInfo> it, DatanodeDescriptor dn) {
        BlockInfo block = it.next();
        it.remove();
        ++this.numBlocksChecked;
        if (!this.isBlockReplicatedOk(dn, block, true, null)) {
            this.pendingRep.computeIfAbsent(dn, k -> new LinkedList()).add(block);
            return true;
        }
        return false;
    }

    private int getPendingCount() {
        if (this.pendingRep.size() == 0) {
            return 0;
        }
        return this.pendingRep.values().stream().map(a -> a.size()).reduce(0, (a, b) -> a + b);
    }

    private int getYetToBeProcessedCount() {
        if (this.outOfServiceNodeBlocks.size() == 0) {
            return 0;
        }
        return this.outOfServiceNodeBlocks.values().stream().map(a -> a.size()).reduce(0, (a, b) -> a + b);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void scanDatanodeStorage(DatanodeDescriptor dn, Boolean initialScan) {
        DatanodeStorageInfo[] storage;
        HashMap<BlockInfo, Integer> blockList = this.outOfServiceNodeBlocks.get((Object)dn);
        if (blockList == null) {
            blockList = new HashMap();
            this.outOfServiceNodeBlocks.put(dn, blockList);
        }
        this.namesystem.readLock();
        try {
            storage = dn.getStorageInfos();
        }
        finally {
            this.namesystem.readUnlock();
        }
        for (DatanodeStorageInfo s : storage) {
            this.namesystem.readLock();
            try {
                if (dn.getStorageInfo(s.getStorageID()) == null) continue;
                Iterator<BlockInfo> it = s.getBlockIterator();
                while (it.hasNext()) {
                    BlockInfo b = it.next();
                    if (!initialScan.booleanValue() || dn.isEnteringMaintenance()) {
                        if (!this.isBlockReplicatedOk(dn, b, false, null)) {
                            blockList.put(b, null);
                        }
                    } else {
                        blockList.put(b, null);
                    }
                    ++this.numBlocksChecked;
                }
            }
            finally {
                this.namesystem.readUnlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPendingReplication() {
        this.namesystem.writeLock();
        try {
            Iterator<Map.Entry<DatanodeDescriptor, List<BlockInfo>>> entIt = this.pendingRep.entrySet().iterator();
            while (entIt.hasNext()) {
                Map.Entry<DatanodeDescriptor, List<BlockInfo>> entry = entIt.next();
                DatanodeDescriptor dn = entry.getKey();
                List<BlockInfo> blocks = entry.getValue();
                if (blocks == null) {
                    entIt.remove();
                    continue;
                }
                Iterator<BlockInfo> blockIt = blocks.iterator();
                BlockStats suspectBlocks = new BlockStats();
                while (blockIt.hasNext()) {
                    BlockInfo b = blockIt.next();
                    if (this.isBlockReplicatedOk(dn, b, true, suspectBlocks)) {
                        blockIt.remove();
                    }
                    ++this.numBlocksChecked;
                }
                if (blocks.size() == 0) {
                    entIt.remove();
                }
                dn.getLeavingServiceStatus().set(suspectBlocks.getOpenFileCount(), suspectBlocks.getOpenFiles(), this.getPendingCountForNode(dn), suspectBlocks.getOutOfServiceBlockCount());
            }
        }
        finally {
            this.namesystem.writeUnlock();
        }
    }

    private boolean isBlockReplicatedOk(DatanodeDescriptor datanode, BlockInfo block, Boolean scheduleReconStruction, BlockStats suspectBlocks) {
        boolean neededReconstruction;
        if (this.blockManager.blocksMap.getStoredBlock(block) == null) {
            LOG.trace("Removing unknown block {}", (Object)block);
            return true;
        }
        long bcId = block.getBlockCollectionId();
        if (bcId == -1L) {
            return false;
        }
        BlockCollection bc = this.blockManager.getBlockCollection(block);
        NumberReplicas num = this.blockManager.countNodes(block);
        int liveReplicas = num.liveReplicas();
        boolean isDecommission = datanode.isDecommissionInProgress();
        boolean isMaintenance = datanode.isEnteringMaintenance();
        boolean bl = neededReconstruction = isDecommission ? this.blockManager.isNeededReconstruction(block, num) : this.blockManager.isNeededReconstructionForMaintenance(block, num);
        if (neededReconstruction && scheduleReconStruction.booleanValue() && !this.blockManager.neededReconstruction.contains(block) && this.blockManager.pendingReconstruction.getNumReplicas(block) == 0 && this.blockManager.isPopulatingReplQueues()) {
            this.blockManager.neededReconstruction.add(block, liveReplicas, num.readOnlyReplicas(), num.outOfServiceReplicas(), this.blockManager.getExpectedRedundancyNum(block));
        }
        if (suspectBlocks != null) {
            if (bc.isUnderConstruction()) {
                INode ucFile = this.namesystem.getFSDirectory().getInode(bc.getId());
                if (!(ucFile instanceof INodeFile) || !ucFile.asFile().isUnderConstruction()) {
                    LOG.warn("File {} is not under construction. Skipping add to low redundancy open files!", (Object)ucFile.getLocalName());
                } else {
                    suspectBlocks.addOpenFile(ucFile.getId());
                }
            }
            if (liveReplicas == 0 && num.outOfServiceReplicas() > 0) {
                suspectBlocks.incrementOutOfServiceBlocks();
            }
        }
        return this.dnAdmin.isSufficient(block, bc, num, isDecommission, isMaintenance);
    }

    static class BlockStats {
        private LightWeightHashSet<Long> openFiles = new LightWeightLinkedSet<Long>();
        private int openFileBlockCount = 0;
        private int outOfServiceBlockCount = 0;

        BlockStats() {
        }

        public void addOpenFile(long id) {
            ++this.openFileBlockCount;
            this.openFiles.add(id);
        }

        public void incrementOutOfServiceBlocks() {
            ++this.outOfServiceBlockCount;
        }

        public LightWeightHashSet<Long> getOpenFiles() {
            return this.openFiles;
        }

        public int getOpenFileCount() {
            return this.openFileBlockCount;
        }

        public int getOutOfServiceBlockCount() {
            return this.outOfServiceBlockCount;
        }
    }
}

