/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.AbstractList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class DecommissionManager {
    private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager.class);
    private final Namesystem namesystem;
    private final BlockManager blockManager;
    private final HeartbeatManager hbManager;
    private final ScheduledExecutorService executor;
    private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>> decomNodeBlocks;
    private final Queue<DatanodeDescriptor> pendingNodes;
    private Monitor monitor = null;

    DecommissionManager(Namesystem namesystem, BlockManager blockManager, HeartbeatManager hbManager) {
        this.namesystem = namesystem;
        this.blockManager = blockManager;
        this.hbManager = hbManager;
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d").setDaemon(true).build());
        this.decomNodeBlocks = new TreeMap();
        this.pendingNodes = new LinkedList<DatanodeDescriptor>();
    }

    void activate(Configuration conf) {
        int intervalSecs = conf.getInt("dfs.namenode.decommission.interval", 30);
        Preconditions.checkArgument((intervalSecs >= 0 ? 1 : 0) != 0, (Object)"Cannot set a negative value for dfs.namenode.decommission.interval");
        int blocksPerInterval = conf.getInt("dfs.namenode.decommission.blocks.per.interval", 500000);
        String deprecatedKey = "dfs.namenode.decommission.nodes.per.interval";
        String strNodes = conf.get("dfs.namenode.decommission.nodes.per.interval");
        if (strNodes != null) {
            LOG.warn("Deprecated configuration key {} will be ignored.", (Object)"dfs.namenode.decommission.nodes.per.interval");
            LOG.warn("Please update your configuration to use {} instead.", (Object)"dfs.namenode.decommission.blocks.per.interval");
        }
        Preconditions.checkArgument((blocksPerInterval > 0 ? 1 : 0) != 0, (Object)"Must set a positive value for dfs.namenode.decommission.blocks.per.interval");
        int maxConcurrentTrackedNodes = conf.getInt("dfs.namenode.decommission.max.concurrent.tracked.nodes", 100);
        Preconditions.checkArgument((maxConcurrentTrackedNodes >= 0 ? 1 : 0) != 0, (Object)"Cannot set a negative value for dfs.namenode.decommission.max.concurrent.tracked.nodes");
        this.monitor = new Monitor(blocksPerInterval, maxConcurrentTrackedNodes);
        this.executor.scheduleAtFixedRate(this.monitor, intervalSecs, intervalSecs, TimeUnit.SECONDS);
        LOG.debug("Activating DecommissionManager with interval {} seconds, {} max blocks per interval, {} max concurrently tracked nodes.", new Object[]{intervalSecs, blocksPerInterval, maxConcurrentTrackedNodes});
    }

    void close() {
        this.executor.shutdownNow();
        try {
            this.executor.awaitTermination(3000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @VisibleForTesting
    public void startDecommission(DatanodeDescriptor node) {
        if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
            this.hbManager.startDecommission(node);
            if (node.isDecommissionInProgress()) {
                for (DatanodeStorageInfo storage : node.getStorageInfos()) {
                    LOG.info("Starting decommission of {} {} with {} blocks", new Object[]{node, storage, storage.numBlocks()});
                }
                node.decommissioningStatus.setStartTime(Time.monotonicNow());
                this.pendingNodes.add(node);
            }
        } else {
            LOG.trace("startDecommission: Node {} in {}, nothing to do." + (Object)((Object)node), (Object)node.getAdminState());
        }
    }

    @VisibleForTesting
    public void stopDecommission(DatanodeDescriptor node) {
        if (node.isDecommissionInProgress() || node.isDecommissioned()) {
            this.hbManager.stopDecommission(node);
            if (node.isAlive()) {
                this.blockManager.processExtraRedundancyBlocksOnReCommission(node);
            }
            this.pendingNodes.remove((Object)node);
            this.decomNodeBlocks.remove((Object)node);
        } else {
            LOG.trace("stopDecommission: Node {} in {}, nothing to do." + (Object)((Object)node), (Object)node.getAdminState());
        }
    }

    private void setDecommissioned(DatanodeDescriptor dn) {
        dn.setDecommissioned();
        LOG.info("Decommissioning complete for node {}", (Object)dn);
    }

    private boolean isSufficient(BlockInfo block, BlockCollection bc, NumberReplicas numberReplicas) {
        short numExpected = this.blockManager.getExpectedRedundancyNum(block);
        int numLive = numberReplicas.liveReplicas();
        if (numLive >= numExpected && this.blockManager.isPlacementPolicySatisfied(block)) {
            LOG.trace("Block {} does not need replication.", (Object)block);
            return true;
        }
        LOG.trace("Block {} numExpected={}, numLive={}", new Object[]{block, (int)numExpected, numLive});
        if (numExpected > numLive) {
            if (bc.isUnderConstruction() && block.equals((Object)bc.getLastBlock())) {
                if (this.blockManager.hasMinStorage(block, numLive)) {
                    LOG.trace("UC block {} sufficiently-replicated since numLive ({}) >= minR ({})", new Object[]{block, numLive, this.blockManager.getMinStorageNum(block)});
                    return true;
                }
                LOG.trace("UC block {} insufficiently-replicated since numLive ({}) < minR ({})", new Object[]{block, numLive, this.blockManager.getMinStorageNum(block)});
            } else if (numLive >= this.blockManager.getDefaultStorageNum(block)) {
                return true;
            }
        }
        return false;
    }

    private void logBlockReplicationInfo(BlockInfo block, BlockCollection bc, DatanodeDescriptor srcNode, NumberReplicas num, Iterable<DatanodeStorageInfo> storages) {
        if (!NameNode.blockStateChangeLog.isInfoEnabled()) {
            return;
        }
        int curReplicas = num.liveReplicas();
        short curExpectedRedundancy = this.blockManager.getExpectedRedundancyNum(block);
        StringBuilder nodeList = new StringBuilder();
        for (DatanodeStorageInfo storage : storages) {
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            nodeList.append((Object)node);
            nodeList.append(" ");
        }
        NameNode.blockStateChangeLog.info("Block: " + (Object)((Object)block) + ", Expected Replicas: " + curExpectedRedundancy + ", live replicas: " + curReplicas + ", corrupt replicas: " + num.corruptReplicas() + ", decommissioned replicas: " + num.decommissioned() + ", decommissioning replicas: " + num.decommissioning() + ", excess replicas: " + num.excessReplicas() + ", Is Open File: " + bc.isUnderConstruction() + ", Datanodes having this block: " + nodeList + ", Current Datanode: " + (Object)((Object)srcNode) + ", Is current datanode decommissioning: " + srcNode.isDecommissionInProgress());
    }

    @VisibleForTesting
    public int getNumPendingNodes() {
        return this.pendingNodes.size();
    }

    @VisibleForTesting
    public int getNumTrackedNodes() {
        return this.decomNodeBlocks.size();
    }

    @VisibleForTesting
    public int getNumNodesChecked() {
        return this.monitor.numNodesChecked;
    }

    @VisibleForTesting
    void runMonitorForTest() throws ExecutionException, InterruptedException {
        this.executor.submit(this.monitor).get();
    }

    private class Monitor
    implements Runnable {
        private final int numBlocksPerCheck;
        private final int maxConcurrentTrackedNodes;
        private int numBlocksChecked = 0;
        private int numNodesChecked = 0;
        private DatanodeDescriptor iterkey = new DatanodeDescriptor(new DatanodeID("", "", "", 0, 0, 0, 0));

        Monitor(int numBlocksPerCheck, int maxConcurrentTrackedNodes) {
            this.numBlocksPerCheck = numBlocksPerCheck;
            this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
        }

        private boolean exceededNumBlocksPerCheck() {
            LOG.trace("Processed {} blocks so far this tick", (Object)this.numBlocksChecked);
            return this.numBlocksChecked >= this.numBlocksPerCheck;
        }

        @Override
        public void run() {
            if (!DecommissionManager.this.namesystem.isRunning()) {
                LOG.info("Namesystem is not running, skipping decommissioning checks.");
                return;
            }
            this.numBlocksChecked = 0;
            this.numNodesChecked = 0;
            DecommissionManager.this.namesystem.writeLock();
            try {
                this.processPendingNodes();
                this.check();
            }
            finally {
                DecommissionManager.this.namesystem.writeUnlock();
            }
            if (this.numBlocksChecked + this.numNodesChecked > 0) {
                LOG.info("Checked {} blocks and {} nodes this tick", (Object)this.numBlocksChecked, (Object)this.numNodesChecked);
            }
        }

        private void processPendingNodes() {
            while (!(DecommissionManager.this.pendingNodes.isEmpty() || this.maxConcurrentTrackedNodes != 0 && DecommissionManager.this.decomNodeBlocks.size() >= this.maxConcurrentTrackedNodes)) {
                DecommissionManager.this.decomNodeBlocks.put(DecommissionManager.this.pendingNodes.poll(), null);
            }
        }

        private void check() {
            Iterator it = new CyclicIteration(DecommissionManager.this.decomNodeBlocks, this.iterkey).iterator();
            LinkedList<DatanodeDescriptor> toRemove = new LinkedList<DatanodeDescriptor>();
            while (it.hasNext() && !this.exceededNumBlocksPerCheck()) {
                ++this.numNodesChecked;
                Map.Entry entry = it.next();
                DatanodeDescriptor dn = entry.getKey();
                AbstractList<BlockInfo> blocks = (AbstractList<BlockInfo>)entry.getValue();
                boolean fullScan = false;
                if (blocks == null) {
                    LOG.debug("Newly-added node {}, doing full scan to find insufficiently-replicated blocks.", (Object)dn);
                    blocks = this.handleInsufficientlyStored(dn);
                    DecommissionManager.this.decomNodeBlocks.put(dn, blocks);
                    fullScan = true;
                } else {
                    LOG.debug("Processing decommission-in-progress node {}", (Object)dn);
                    this.pruneReliableBlocks(dn, blocks);
                }
                if (blocks.size() == 0) {
                    if (!fullScan) {
                        LOG.debug("Node {} has finished replicating current set of blocks, checking with the full block map.", (Object)dn);
                        blocks = this.handleInsufficientlyStored(dn);
                        DecommissionManager.this.decomNodeBlocks.put(dn, blocks);
                    }
                    boolean isHealthy = DecommissionManager.this.blockManager.isNodeHealthyForDecommission(dn);
                    if (blocks.size() == 0 && isHealthy) {
                        DecommissionManager.this.setDecommissioned(dn);
                        toRemove.add(dn);
                        LOG.debug("Node {} is sufficiently replicated and healthy, marked as decommissioned.", (Object)dn);
                    } else {
                        LOG.debug("Node {} {} healthy. It needs to replicate {} more blocks. Decommissioning is still in progress.", new Object[]{dn, isHealthy ? "is" : "isn't", blocks.size()});
                    }
                } else {
                    LOG.debug("Node {} still has {} blocks to replicate before it is a candidate to finish decommissioning.", (Object)dn, (Object)blocks.size());
                }
                this.iterkey = dn;
            }
            for (DatanodeDescriptor dn : toRemove) {
                Preconditions.checkState((boolean)dn.isDecommissioned(), (Object)"Removing a node that is not yet decommissioned!");
                DecommissionManager.this.decomNodeBlocks.remove((Object)dn);
            }
        }

        private void pruneReliableBlocks(DatanodeDescriptor datanode, AbstractList<BlockInfo> blocks) {
            this.processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
        }

        private AbstractList<BlockInfo> handleInsufficientlyStored(DatanodeDescriptor datanode) {
            ChunkedArrayList insufficient = new ChunkedArrayList();
            this.processBlocksForDecomInternal(datanode, datanode.getBlockIterator(), (List<BlockInfo>)insufficient, false);
            return insufficient;
        }

        private void processBlocksForDecomInternal(DatanodeDescriptor datanode, Iterator<BlockInfo> it, List<BlockInfo> insufficientList, boolean pruneReliableBlocks) {
            boolean firstReplicationLog = true;
            int lowRedundancyBlocks = 0;
            int decommissionOnlyReplicas = 0;
            int lowRedundancyInOpenFiles = 0;
            while (it.hasNext()) {
                ++this.numBlocksChecked;
                BlockInfo block = it.next();
                if (((DecommissionManager)DecommissionManager.this).blockManager.blocksMap.getStoredBlock(block) == null) {
                    LOG.trace("Removing unknown block {}", (Object)block);
                    it.remove();
                    continue;
                }
                long bcId = block.getBlockCollectionId();
                if (bcId == -1L) continue;
                BlockCollection bc = DecommissionManager.this.blockManager.getBlockCollection(block);
                NumberReplicas num = DecommissionManager.this.blockManager.countNodes(block);
                int liveReplicas = num.liveReplicas();
                if (DecommissionManager.this.blockManager.isNeededReconstruction(block, liveReplicas) && !((DecommissionManager)DecommissionManager.this).blockManager.neededReconstruction.contains(block) && ((DecommissionManager)DecommissionManager.this).blockManager.pendingReconstruction.getNumReplicas(block) == 0 && DecommissionManager.this.blockManager.isPopulatingReplQueues()) {
                    ((DecommissionManager)DecommissionManager.this).blockManager.neededReconstruction.add(block, liveReplicas, num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), DecommissionManager.this.blockManager.getExpectedRedundancyNum(block));
                }
                if (DecommissionManager.this.isSufficient(block, bc, num)) {
                    if (!pruneReliableBlocks) continue;
                    it.remove();
                    continue;
                }
                if (insufficientList != null) {
                    insufficientList.add(block);
                }
                if (firstReplicationLog) {
                    DecommissionManager.this.logBlockReplicationInfo(block, bc, datanode, num, ((DecommissionManager)DecommissionManager.this).blockManager.blocksMap.getStorages(block));
                    firstReplicationLog = false;
                }
                ++lowRedundancyBlocks;
                if (bc.isUnderConstruction()) {
                    ++lowRedundancyInOpenFiles;
                }
                if (liveReplicas != 0 || num.decommissionedAndDecommissioning() <= 0) continue;
                ++decommissionOnlyReplicas;
            }
            datanode.decommissioningStatus.set(lowRedundancyBlocks, decommissionOnlyReplicas, lowRedundancyInOpenFiles);
        }
    }
}

