/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.PathUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDecommissionWithStriped {
    private static final Logger LOG = LoggerFactory.getLogger(TestDecommissionWithStriped.class);
    private static final int HEARTBEAT_INTERVAL = 1;
    private static final int BLOCKREPORT_INTERVAL_MSEC = 1000;
    private static final int NAMENODE_REPLICATION_INTERVAL = 1;
    private Path decommissionDir;
    private Path hostsFile;
    private Path excludeFile;
    private FileSystem localFileSys;
    private Configuration conf;
    private MiniDFSCluster cluster;
    private DistributedFileSystem dfs;
    private final ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
    private int numDNs;
    private final int cellSize = this.ecPolicy.getCellSize();
    private final int dataBlocks = this.ecPolicy.getNumDataUnits();
    private final int parityBlocks = this.ecPolicy.getNumParityUnits();
    private final int blockSize = this.cellSize * 4;
    private final int blockGroupSize = this.blockSize * this.dataBlocks;
    private final Path ecDir = new Path("/" + this.getClass().getSimpleName());
    private FSNamesystem fsn;
    private BlockManager bm;
    private DFSClient client;

    @Before
    public void setup() throws IOException {
        this.conf = new HdfsConfiguration();
        this.localFileSys = FileSystem.getLocal((Configuration)this.conf);
        Path workingDir = this.localFileSys.getWorkingDirectory();
        this.decommissionDir = new Path(workingDir, PathUtils.getTestDirName(this.getClass()) + "/work-dir/decommission");
        this.hostsFile = new Path(this.decommissionDir, "hosts");
        this.excludeFile = new Path(this.decommissionDir, "exclude");
        this.writeConfigFile(this.hostsFile, null);
        this.writeConfigFile(this.excludeFile, null);
        this.conf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
        this.conf.set("dfs.hosts", this.hostsFile.toUri().getPath());
        this.conf.set("dfs.hosts.exclude", this.excludeFile.toUri().getPath());
        this.conf.setInt("dfs.namenode.heartbeat.recheck-interval", 2000);
        this.conf.setInt("dfs.heartbeat.interval", 1);
        this.conf.setInt("dfs.namenode.redundancy.interval.seconds", 1);
        this.conf.setInt("dfs.blockreport.intervalMsec", 1000);
        this.conf.setInt("dfs.namenode.reconstruction.pending.timeout-sec", 4);
        this.conf.setInt("dfs.namenode.redundancy.interval.seconds", 1);
        this.conf.setLong("dfs.blocksize", (long)this.blockSize);
        this.conf.setInt("dfs.datanode.ec.reconstruction.stripedread.buffer.size", this.cellSize - 1);
        this.conf.setInt("dfs.namenode.redundancy.interval.seconds", 1);
        this.conf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
        this.numDNs = this.dataBlocks + this.parityBlocks + 2;
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(this.numDNs).build();
        this.cluster.waitActive();
        this.dfs = this.cluster.getFileSystem(0);
        this.fsn = this.cluster.getNamesystem();
        this.bm = this.fsn.getBlockManager();
        this.client = TestDecommissionWithStriped.getDfsClient(this.cluster.getNameNode(0), this.conf);
        this.dfs.enableErasureCodingPolicy(StripedFileTestUtil.getDefaultECPolicy().getName());
        this.dfs.mkdirs(this.ecDir);
        this.dfs.setErasureCodingPolicy(this.ecDir, StripedFileTestUtil.getDefaultECPolicy().getName());
    }

    @After
    public void teardown() throws IOException {
        this.cleanupFile(this.localFileSys, this.decommissionDir);
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    @Test(timeout=120000L)
    public void testFileFullBlockGroup() throws Exception {
        LOG.info("Starting test testFileFullBlockGroup");
        this.testDecommission(this.blockSize * this.dataBlocks, 9, 1, "testFileFullBlockGroup");
    }

    @Test(timeout=120000L)
    public void testFileMultipleBlockGroups() throws Exception {
        LOG.info("Starting test testFileMultipleBlockGroups");
        int writeBytes = 2 * this.blockSize * this.dataBlocks;
        this.testDecommission(writeBytes, 9, 1, "testFileMultipleBlockGroups");
    }

    @Test(timeout=120000L)
    public void testFileSmallerThanOneCell() throws Exception {
        LOG.info("Starting test testFileSmallerThanOneCell");
        this.testDecommission(this.cellSize - 1, 4, 1, "testFileSmallerThanOneCell");
    }

    @Test(timeout=120000L)
    public void testFileSmallerThanOneStripe() throws Exception {
        LOG.info("Starting test testFileSmallerThanOneStripe");
        this.testDecommission(this.cellSize * 2, 5, 1, "testFileSmallerThanOneStripe");
    }

    @Test(timeout=120000L)
    public void testDecommissionTwoNodes() throws Exception {
        LOG.info("Starting test testDecommissionTwoNodes");
        this.testDecommission(this.blockSize * this.dataBlocks, 9, 2, "testDecommissionTwoNodes");
    }

    @Test(timeout=120000L)
    public void testDecommissionWithURBlockForSameBlockGroup() throws Exception {
        LOG.info("Starting test testDecommissionWithURBlocksForSameBlockGroup");
        Path ecFile = new Path(this.ecDir, "testDecommissionWithCorruptBlocks");
        int writeBytes = this.cellSize * this.dataBlocks * 2;
        this.writeStripedFile(this.dfs, ecFile, writeBytes);
        Assert.assertEquals((long)0L, (long)this.bm.numOfUnderReplicatedBlocks());
        final ArrayList<DatanodeInfo> decommisionNodes = new ArrayList<DatanodeInfo>();
        LocatedBlock lb = this.dfs.getClient().getLocatedBlocks(ecFile.toString(), 0L).get(0);
        DatanodeInfo[] dnLocs = lb.getLocations();
        Assert.assertEquals((long)(this.dataBlocks + this.parityBlocks), (long)dnLocs.length);
        int decommNodeIndex = this.dataBlocks - 1;
        int stopNodeIndex = 1;
        decommisionNodes.add(dnLocs[decommNodeIndex]);
        DatanodeInfo[] info = this.client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
        ArrayList<MiniDFSCluster.DataNodeProperties> stoppedDns = new ArrayList<MiniDFSCluster.DataNodeProperties>();
        for (DatanodeInfo liveDn : info) {
            boolean usedNode = false;
            for (DatanodeInfo datanodeInfo : dnLocs) {
                if (!liveDn.getXferAddr().equals(datanodeInfo.getXferAddr())) continue;
                usedNode = true;
                break;
            }
            if (usedNode) continue;
            DataNode dn = this.cluster.getDataNode(liveDn.getIpcPort());
            stoppedDns.add(this.cluster.stopDataNode(liveDn.getXferAddr()));
            this.cluster.setDataNodeDead(dn.getDatanodeId());
            LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
        }
        DataNode dn = this.cluster.getDataNode(dnLocs[stopNodeIndex].getIpcPort());
        this.cluster.stopDataNode(dnLocs[stopNodeIndex].getXferAddr());
        this.cluster.setDataNodeDead(dn.getDatanodeId());
        --this.numDNs;
        final CountDownLatch decomStarted = new CountDownLatch(0);
        Thread decomTh = new Thread(){

            @Override
            public void run() {
                try {
                    decomStarted.countDown();
                    TestDecommissionWithStriped.this.decommissionNode(0, decommisionNodes, DatanodeInfo.AdminStates.DECOMMISSIONED);
                }
                catch (Exception e) {
                    LOG.error("Exception while decommissioning", (Throwable)e);
                    Assert.fail((String)"Shouldn't throw exception!");
                }
            }
        };
        int deadDecomissioned = this.fsn.getNumDecomDeadDataNodes();
        int liveDecomissioned = this.fsn.getNumDecomLiveDataNodes();
        decomTh.start();
        decomStarted.await(5L, TimeUnit.SECONDS);
        Thread.sleep(3000L);
        for (MiniDFSCluster.DataNodeProperties dnp : stoppedDns) {
            this.cluster.restartDataNode(dnp);
            LOG.info("Restarts stopped datanode:{} to trigger block reconstruction", (Object)dnp.datanode);
        }
        this.cluster.waitActive();
        LOG.info("Waiting to finish decommissioning node:{}", decommisionNodes);
        decomTh.join(20000L);
        LOG.info("Finished decommissioning node:{}", decommisionNodes);
        Assert.assertEquals((long)deadDecomissioned, (long)this.fsn.getNumDecomDeadDataNodes());
        Assert.assertEquals((long)(liveDecomissioned + decommisionNodes.size()), (long)this.fsn.getNumDecomLiveDataNodes());
        DFSClient client = TestDecommissionWithStriped.getDfsClient(this.cluster.getNameNode(0), this.conf);
        Assert.assertEquals((String)"All datanodes must be alive", (long)this.numDNs, (long)client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE).length);
        Assert.assertNull((Object)TestDecommissionWithStriped.checkFile((FileSystem)this.dfs, ecFile, 9, decommisionNodes, this.numDNs));
        StripedFileTestUtil.checkData(this.dfs, ecFile, writeBytes, decommisionNodes, null, this.blockGroupSize);
        this.cleanupFile((FileSystem)this.dfs, ecFile);
    }

    @Test(timeout=120000L)
    public void testFileChecksumAfterDecommission() throws Exception {
        LOG.info("Starting test testFileChecksumAfterDecommission");
        Path ecFile = new Path(this.ecDir, "testFileChecksumAfterDecommission");
        int writeBytes = this.cellSize * this.dataBlocks;
        this.writeStripedFile(this.dfs, ecFile, writeBytes);
        Assert.assertEquals((long)0L, (long)this.bm.numOfUnderReplicatedBlocks());
        FileChecksum fileChecksum1 = this.dfs.getFileChecksum(ecFile, (long)writeBytes);
        ArrayList<DatanodeInfo> decommisionNodes = new ArrayList<DatanodeInfo>();
        LocatedBlock lb = this.dfs.getClient().getLocatedBlocks(ecFile.toString(), 0L).get(0);
        DatanodeInfo[] dnLocs = lb.getLocations();
        Assert.assertEquals((long)(this.dataBlocks + this.parityBlocks), (long)dnLocs.length);
        int decommNodeIndex = 1;
        decommisionNodes.add(dnLocs[decommNodeIndex]);
        this.decommissionNode(0, decommisionNodes, DatanodeInfo.AdminStates.DECOMMISSIONED);
        Assert.assertEquals((long)decommisionNodes.size(), (long)this.fsn.getNumDecomLiveDataNodes());
        Assert.assertNull((Object)TestDecommissionWithStriped.checkFile((FileSystem)this.dfs, ecFile, 9, decommisionNodes, this.numDNs));
        StripedFileTestUtil.checkData(this.dfs, ecFile, writeBytes, decommisionNodes, null, this.blockGroupSize);
        FileChecksum fileChecksum2 = this.dfs.getFileChecksum(ecFile, (long)writeBytes);
        LOG.info("fileChecksum1:" + fileChecksum1);
        LOG.info("fileChecksum2:" + fileChecksum2);
        Assert.assertTrue((String)"Checksum mismatches!", (boolean)fileChecksum1.equals((Object)fileChecksum2));
    }

    private void testDecommission(int writeBytes, int storageCount, int decomNodeCount, String filename) throws IOException, Exception {
        Path ecFile = new Path(this.ecDir, filename);
        this.writeStripedFile(this.dfs, ecFile, writeBytes);
        List<DatanodeInfo> decommisionNodes = this.getDecommissionDatanode(this.dfs, ecFile, writeBytes, decomNodeCount);
        int deadDecomissioned = this.fsn.getNumDecomDeadDataNodes();
        int liveDecomissioned = this.fsn.getNumDecomLiveDataNodes();
        List lbs = ((HdfsDataInputStream)this.dfs.open(ecFile)).getAllBlocks();
        ArrayList<HashMap<DatanodeInfo, Byte>> locToIndexList = new ArrayList<HashMap<DatanodeInfo, Byte>>();
        ArrayList<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList = new ArrayList<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>>();
        this.prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList);
        this.decommissionNode(0, decommisionNodes, DatanodeInfo.AdminStates.DECOMMISSIONED);
        Assert.assertEquals((long)deadDecomissioned, (long)this.fsn.getNumDecomDeadDataNodes());
        Assert.assertEquals((long)(liveDecomissioned + decommisionNodes.size()), (long)this.fsn.getNumDecomLiveDataNodes());
        DFSClient client = TestDecommissionWithStriped.getDfsClient(this.cluster.getNameNode(0), this.conf);
        Assert.assertEquals((String)"All datanodes must be alive", (long)this.numDNs, (long)client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE).length);
        Assert.assertNull((Object)TestDecommissionWithStriped.checkFile((FileSystem)this.dfs, ecFile, storageCount, decommisionNodes, this.numDNs));
        StripedFileTestUtil.checkData(this.dfs, ecFile, writeBytes, decommisionNodes, null, this.blockGroupSize);
        this.assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList);
        this.cleanupFile((FileSystem)this.dfs, ecFile);
    }

    private void prepareBlockIndexAndTokenList(List<LocatedBlock> lbs, List<HashMap<DatanodeInfo, Byte>> locToIndexList, List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList) {
        for (LocatedBlock lb : lbs) {
            HashMap<DatanodeInfo, Byte> locToIndex = new HashMap<DatanodeInfo, Byte>();
            locToIndexList.add(locToIndex);
            HashMap<DatanodeInfo, Token> locToToken = new HashMap<DatanodeInfo, Token>();
            locToTokenList.add(locToToken);
            DatanodeInfo[] di = lb.getLocations();
            LocatedStripedBlock stripedBlk = (LocatedStripedBlock)lb;
            for (int i = 0; i < di.length; ++i) {
                locToIndex.put(di[i], stripedBlk.getBlockIndices()[i]);
                locToToken.put(di[i], stripedBlk.getBlockTokens()[i]);
            }
        }
    }

    private void assertBlockIndexAndTokenPosition(List<LocatedBlock> lbs, List<HashMap<DatanodeInfo, Byte>> locToIndexList, List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList) {
        for (int i = 0; i < lbs.size(); ++i) {
            LocatedBlock lb = lbs.get(i);
            LocatedStripedBlock stripedBlk = (LocatedStripedBlock)lb;
            HashMap<DatanodeInfo, Byte> locToIndex = locToIndexList.get(i);
            HashMap<DatanodeInfo, Token<BlockTokenIdentifier>> locToToken = locToTokenList.get(i);
            DatanodeInfo[] di = lb.getLocations();
            for (int j = 0; j < di.length; ++j) {
                Assert.assertEquals((String)"Block index value mismatches after sorting", (long)locToIndex.get(di[j]).byteValue(), (long)stripedBlk.getBlockIndices()[j]);
                Assert.assertEquals((String)"Block token value mismatches after sorting", locToToken.get(di[j]), (Object)stripedBlk.getBlockTokens()[j]);
            }
        }
    }

    private List<DatanodeInfo> getDecommissionDatanode(DistributedFileSystem dfs, Path ecFile, int writeBytes, int decomNodeCount) throws IOException {
        ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<DatanodeInfo>();
        DatanodeInfo[] info = this.client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
        BlockLocation[] fileBlockLocations = dfs.getFileBlockLocations(ecFile, 0L, (long)writeBytes);
        for (String dnName : fileBlockLocations[0].getNames()) {
            for (DatanodeInfo dn : info) {
                if (dnName.equals(dn.getXferAddr())) {
                    decommissionedNodes.add(dn);
                }
                if (decommissionedNodes.size() < decomNodeCount) continue;
                return decommissionedNodes;
            }
        }
        return decommissionedNodes;
    }

    private static DFSClient getDfsClient(NameNode nn, Configuration conf) throws IOException {
        return new DFSClient(nn.getNameNodeAddress(), conf);
    }

    private void writeStripedFile(DistributedFileSystem dfs, Path ecFile, int writeBytes) throws IOException, Exception {
        byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
        DFSTestUtil.writeFile((FileSystem)dfs, ecFile, new String(bytes));
        StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString());
        StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, new ArrayList<DatanodeInfo>(), null, this.blockGroupSize);
    }

    private void writeConfigFile(Path name, List<String> nodes) throws IOException {
        if (this.localFileSys.exists(name)) {
            this.localFileSys.delete(name, true);
        }
        FSDataOutputStream stm = this.localFileSys.create(name);
        if (nodes != null) {
            for (String node : nodes) {
                stm.writeBytes(node);
                stm.writeBytes("\n");
            }
        }
        stm.close();
    }

    private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
        Assert.assertTrue((boolean)fileSys.exists(name));
        fileSys.delete(name, true);
        Assert.assertTrue((!fileSys.exists(name) ? 1 : 0) != 0);
    }

    private void decommissionNode(int nnIndex, List<DatanodeInfo> decommissionedNodes, DatanodeInfo.AdminStates waitForState) throws IOException {
        DFSClient client = TestDecommissionWithStriped.getDfsClient(this.cluster.getNameNode(nnIndex), this.conf);
        DatanodeInfo[] info = client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
        ArrayList<String> excludeNodes = new ArrayList<String>();
        for (DatanodeInfo dn : decommissionedNodes) {
            boolean nodeExists = false;
            for (DatanodeInfo dninfo : info) {
                if (!dninfo.getDatanodeUuid().equals(dn.getDatanodeUuid())) continue;
                nodeExists = true;
                break;
            }
            Assert.assertTrue((String)("Datanode: " + dn + " is not LIVE"), (boolean)nodeExists);
            excludeNodes.add(dn.getName());
            LOG.info("Decommissioning node: " + dn.getName());
        }
        this.writeConfigFile(this.excludeFile, excludeNodes);
        TestDecommissionWithStriped.refreshNodes(this.cluster.getNamesystem(nnIndex), this.conf);
        for (DatanodeInfo dn : decommissionedNodes) {
            DatanodeDescriptor ret = NameNodeAdapter.getDatanode(this.cluster.getNamesystem(nnIndex), (DatanodeID)dn);
            this.waitNodeState((DatanodeInfo)ret, waitForState);
        }
    }

    private static void refreshNodes(FSNamesystem ns, Configuration conf) throws IOException {
        ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
    }

    private void waitNodeState(DatanodeInfo node, DatanodeInfo.AdminStates state) {
        boolean done;
        boolean bl = done = state == node.getAdminState();
        while (!done) {
            LOG.info("Waiting for node " + node + " to change state to " + state + " current state: " + node.getAdminState());
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            done = state == node.getAdminState();
        }
        LOG.info("node " + node + " reached the state " + state);
    }

    private static String checkFile(FileSystem fileSys, Path name, int repl, List<DatanodeInfo> decommissionedNodes, int numDatanodes) throws IOException {
        boolean isNodeDown = decommissionedNodes.size() > 0;
        Assert.assertTrue((String)("Not HDFS:" + fileSys.getUri()), (boolean)(fileSys instanceof DistributedFileSystem));
        HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name);
        List dinfo = dis.getAllBlocks();
        for (LocatedBlock blk : dinfo) {
            int hasdown = 0;
            DatanodeInfo[] nodes = blk.getLocations();
            for (int j = 0; j < nodes.length; ++j) {
                LOG.info("Block Locations size={}, locs={}, j=", new Object[]{nodes.length, nodes[j].toString(), j});
                boolean found = false;
                for (DatanodeInfo datanodeInfo : decommissionedNodes) {
                    if (!isNodeDown || !nodes[j].getXferAddr().equals(datanodeInfo.getXferAddr())) continue;
                    found = true;
                    ++hasdown;
                    if (!nodes[j].isDecommissioned()) {
                        return "For block " + blk.getBlock() + " replica on " + nodes[j] + " is given as downnode, but is not decommissioned";
                    }
                    if (j < repl) {
                        return "For block " + blk.getBlock() + " decommissioned node " + nodes[j] + " was not last node in list: " + (j + 1) + " of " + nodes.length;
                    }
                    LOG.info("Block " + blk.getBlock() + " replica on " + nodes[j] + " is decommissioned.");
                }
                if (found || !nodes[j].isDecommissioned()) continue;
                return "For block " + blk.getBlock() + " replica on " + nodes[j] + " is unexpectedly decommissioned";
            }
            LOG.info("Block " + blk.getBlock() + " has " + hasdown + " decommissioned replica.");
            if (Math.min(numDatanodes, repl + hasdown) == nodes.length) continue;
            return "Wrong number of replicas for block " + blk.getBlock() + ": " + nodes.length + ", expected " + Math.min(numDatanodes, repl + hasdown);
        }
        return null;
    }
}

