/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.base.Supplier;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestReplication {
    private static final long seed = 3735928559L;
    private static final int blockSize = 8192;
    private static final int fileSize = 16384;
    private static final String[] racks = new String[]{"/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"};
    private static final int numDatanodes = racks.length;
    private static final Logger LOG = LoggerFactory.getLogger(TestReplication.class);

    private void checkFile(FileSystem fileSys, Path name, int repl) throws IOException {
        LocatedBlock blk;
        DatanodeInfo[] datanodes;
        Configuration conf = fileSys.getConf();
        ClientProtocol namenode = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)fileSys.getUri(), ClientProtocol.class).getProxy();
        this.waitForBlockReplication(name.toString(), namenode, Math.min(numDatanodes, repl), -1L);
        LocatedBlocks locations = namenode.getBlockLocations(name.toString(), 0L, Long.MAX_VALUE);
        FileStatus stat = fileSys.getFileStatus(name);
        BlockLocation[] blockLocations = fileSys.getFileBlockLocations(stat, 0L, Long.MAX_VALUE);
        Assert.assertTrue((blockLocations.length == locations.locatedBlockCount() ? 1 : 0) != 0);
        for (int i = 0; i < blockLocations.length; ++i) {
            LocatedBlock blk2 = locations.get(i);
            DatanodeInfo[] datanodes2 = blk2.getLocations();
            String[] topologyPaths = blockLocations[i].getTopologyPaths();
            Assert.assertTrue((topologyPaths.length == datanodes2.length ? 1 : 0) != 0);
            for (int j = 0; j < topologyPaths.length; ++j) {
                boolean found = false;
                for (int k = 0; k < racks.length; ++k) {
                    if (!topologyPaths[j].startsWith(racks[k])) continue;
                    found = true;
                    break;
                }
                Assert.assertTrue((boolean)found);
            }
        }
        boolean isOnSameRack = true;
        boolean isNotOnSameRack = true;
        Iterator iterator = locations.getLocatedBlocks().iterator();
        while (iterator.hasNext() && (datanodes = (blk = (LocatedBlock)iterator.next()).getLocations()).length > 1) {
            if (datanodes.length == 2) {
                isNotOnSameRack = !datanodes[0].getNetworkLocation().equals(datanodes[1].getNetworkLocation());
                break;
            }
            isOnSameRack = false;
            isNotOnSameRack = false;
            for (int i = 0; i < datanodes.length - 1; ++i) {
                LOG.info("datanode " + i + ": " + datanodes[i]);
                boolean onRack = false;
                for (int j = i + 1; j < datanodes.length; ++j) {
                    if (!datanodes[i].getNetworkLocation().equals(datanodes[j].getNetworkLocation())) continue;
                    onRack = true;
                }
                if (onRack) {
                    isOnSameRack = true;
                }
                if (!onRack) {
                    isNotOnSameRack = true;
                }
                if (isOnSameRack && isNotOnSameRack) break;
            }
            if (isOnSameRack && isNotOnSameRack) continue;
            break;
        }
        Assert.assertTrue((boolean)isOnSameRack);
        Assert.assertTrue((boolean)isNotOnSameRack);
    }

    private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
        Assert.assertTrue((boolean)fileSys.exists(name));
        fileSys.delete(name, true);
        Assert.assertTrue((!fileSys.exists(name) ? 1 : 0) != 0);
    }

    private void testBadBlockReportOnTransfer(boolean corruptBlockByDeletingBlockFile) throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        DistributedFileSystem fs = null;
        DFSClient dfsClient = null;
        LocatedBlocks blocks = null;
        int replicaCount = 0;
        short replFactor = 1;
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(2).build();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), (Configuration)conf);
        Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
        DFSTestUtil.createFile((FileSystem)fs, file1, 1024L, replFactor, 0L);
        DFSTestUtil.waitReplication((FileSystem)fs, file1, replFactor);
        ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, file1);
        int blockFilesCorrupted = corruptBlockByDeletingBlockFile ? cluster.corruptBlockOnDataNodesByDeletingBlockFile(block) : cluster.corruptBlockOnDataNodes(block);
        Assert.assertEquals((String)"Corrupted too few blocks", (long)replFactor, (long)blockFilesCorrupted);
        replFactor = 2;
        fs.setReplication(file1, replFactor);
        blocks = dfsClient.getNamenode().getBlockLocations(file1.toString(), 0L, Long.MAX_VALUE);
        while (!blocks.get(0).isCorrupt()) {
            try {
                LOG.info("Waiting until block is marked as corrupt...");
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            blocks = dfsClient.getNamenode().getBlockLocations(file1.toString(), 0L, Long.MAX_VALUE);
        }
        replicaCount = blocks.get(0).getLocations().length;
        Assert.assertTrue((replicaCount == 1 ? 1 : 0) != 0);
        cluster.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void testBadBlockReportOnTransferCorruptFile() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.set("dfs.datanode.fsdataset.factory", CorruptFileSimulatedFSDataset.Factory.class.getName());
        conf.setLong("dfs.datanode.scan.period.hours", -1L);
        int replicaCount = 0;
        short replFactor = 1;
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(2).build();
        cluster.waitActive();
        try {
            DistributedFileSystem fs = cluster.getFileSystem();
            DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), (Configuration)conf);
            Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
            DFSTestUtil.createFile((FileSystem)fs, file1, 1024L, replFactor, 0L);
            DFSTestUtil.waitReplication((FileSystem)fs, file1, replFactor);
            replFactor = 2;
            fs.setReplication(file1, replFactor);
            GenericTestUtils.waitFor(() -> {
                try {
                    return dfsClient.getNamenode().getBlockLocations(file1.toString(), 0L, Long.MAX_VALUE).get(0).isCorrupt();
                }
                catch (IOException ie) {
                    return false;
                }
            }, (long)1000L, (long)15000L);
            replicaCount = dfsClient.getNamenode().getBlockLocations(file1.toString(), 0L, Long.MAX_VALUE).get(0).getLocations().length;
            Assert.assertEquals((String)"replication should not success", (long)1L, (long)replicaCount);
        }
        finally {
            cluster.shutdown();
        }
    }

    @Test
    public void testBadBlockReportOnTransfer() throws Exception {
        this.testBadBlockReportOnTransfer(false);
    }

    @Test
    public void testBadBlockReportOnTransferMissingBlockFile() throws Exception {
        this.testBadBlockReportOnTransfer(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runReplication(boolean simulated) throws IOException {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
        if (simulated) {
            SimulatedFSDataset.setFactory((Configuration)conf);
        }
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDatanodes).racks(racks).build();
        cluster.waitActive();
        InetSocketAddress addr = new InetSocketAddress("localhost", cluster.getNameNodePort());
        DFSClient client = new DFSClient(addr, (Configuration)conf);
        DatanodeInfo[] info = client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
        Assert.assertEquals((String)"Number of Datanodes ", (long)numDatanodes, (long)info.length);
        DistributedFileSystem fileSys = cluster.getFileSystem();
        try {
            Path file1 = new Path("/smallblocktest.dat");
            DFSTestUtil.createFile((FileSystem)fileSys, file1, 16384, 16384L, 8192L, (short)3, 3735928559L);
            this.checkFile((FileSystem)fileSys, file1, 3);
            this.cleanupFile((FileSystem)fileSys, file1);
            DFSTestUtil.createFile((FileSystem)fileSys, file1, 16384, 16384L, 8192L, (short)10, 3735928559L);
            this.checkFile((FileSystem)fileSys, file1, 10);
            this.cleanupFile((FileSystem)fileSys, file1);
            DFSTestUtil.createFile((FileSystem)fileSys, file1, 16384, 16384L, 8192L, (short)4, 3735928559L);
            this.checkFile((FileSystem)fileSys, file1, 4);
            this.cleanupFile((FileSystem)fileSys, file1);
            DFSTestUtil.createFile((FileSystem)fileSys, file1, 16384, 16384L, 8192L, (short)1, 3735928559L);
            this.checkFile((FileSystem)fileSys, file1, 1);
            this.cleanupFile((FileSystem)fileSys, file1);
            DFSTestUtil.createFile((FileSystem)fileSys, file1, 16384, 16384L, 8192L, (short)2, 3735928559L);
            this.checkFile((FileSystem)fileSys, file1, 2);
            this.cleanupFile((FileSystem)fileSys, file1);
        }
        finally {
            fileSys.close();
            cluster.shutdown();
        }
    }

    @Test
    public void testReplicationSimulatedStorag() throws IOException {
        this.runReplication(true);
    }

    @Test
    public void testReplication() throws IOException {
        this.runReplication(false);
    }

    private void waitForBlockReplication(String filename, ClientProtocol namenode, int expected, long maxWaitSec) throws IOException {
        this.waitForBlockReplication(filename, namenode, expected, maxWaitSec, false, false);
    }

    private void waitForBlockReplication(String filename, ClientProtocol namenode, int expected, long maxWaitSec, boolean isUnderConstruction, boolean noOverReplication) throws IOException {
        long start = Time.monotonicNow();
        LOG.info("Checking for block replication for " + filename);
        while (true) {
            boolean replOk = true;
            LocatedBlocks blocks = namenode.getBlockLocations(filename, 0L, Long.MAX_VALUE);
            Iterator iter = blocks.getLocatedBlocks().iterator();
            while (iter.hasNext()) {
                LocatedBlock block = (LocatedBlock)iter.next();
                if (isUnderConstruction && !iter.hasNext()) break;
                int actual = block.getLocations().length;
                if (noOverReplication) {
                    Assert.assertTrue((actual <= expected ? 1 : 0) != 0);
                }
                if (actual >= expected) continue;
                LOG.info("Not enough replicas for " + block.getBlock() + " yet. Expecting " + expected + ", got " + actual + ".");
                replOk = false;
                break;
            }
            if (replOk) {
                return;
            }
            if (maxWaitSec > 0L && Time.monotonicNow() - start > maxWaitSec * 1000L) {
                throw new IOException("Timedout while waiting for all blocks to  be replicated for " + filename);
            }
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException interruptedException) {
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingReplicationRetry() throws IOException {
        MiniDFSCluster cluster = null;
        int numDataNodes = 4;
        String testFile = "/replication-test-file";
        Path testPath = new Path(testFile);
        byte[] buffer = new byte[1024];
        for (int i = 0; i < buffer.length; ++i) {
            buffer[i] = 49;
        }
        try {
            HdfsConfiguration conf = new HdfsConfiguration();
            conf.set("dfs.replication", Integer.toString(numDataNodes));
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).build();
            cluster.waitActive();
            DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), (Configuration)conf);
            FSDataOutputStream out = cluster.getFileSystem().create(testPath);
            out.write(buffer);
            out.close();
            this.waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, -1L);
            ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(testFile, 0L, Long.MAX_VALUE).get(0).getBlock();
            ArrayList<FsDatasetTestUtils.MaterializedReplica> replicas = new ArrayList<FsDatasetTestUtils.MaterializedReplica>();
            for (int dnIndex = 0; dnIndex < 3; ++dnIndex) {
                replicas.add(cluster.getMaterializedReplica(dnIndex, block));
            }
            Assert.assertEquals((long)3L, (long)replicas.size());
            cluster.shutdown();
            int fileCount = 0;
            for (FsDatasetTestUtils.MaterializedReplica replica : replicas) {
                if (fileCount == 0) {
                    LOG.info("Deleting block " + replica);
                    replica.deleteData();
                } else {
                    LOG.info("Corrupting file " + replica);
                    replica.corruptData();
                }
                ++fileCount;
            }
            LOG.info("Restarting minicluster after deleting a replica and corrupting 2 crcs");
            conf = new HdfsConfiguration();
            conf.set("dfs.replication", Integer.toString(numDataNodes));
            conf.set("dfs.namenode.reconstruction.pending.timeout-sec", Integer.toString(2));
            conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
            conf.set("dfs.namenode.safemode.threshold-pct", "0.75f");
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes * 2).format(false).build();
            cluster.waitActive();
            dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), (Configuration)conf);
            this.waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, -1L);
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    @Test
    public void testReplicateLenMismatchedBlock() throws Exception {
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)new HdfsConfiguration()).numDataNodes(2).build();
        try {
            cluster.waitActive();
            this.changeBlockLen(cluster, -1);
            this.changeBlockLen(cluster, 1);
        }
        finally {
            cluster.shutdown();
        }
    }

    private void changeBlockLen(MiniDFSCluster cluster, int lenDelta) throws IOException, InterruptedException, TimeoutException {
        Path fileName = new Path("/file1");
        boolean REPLICATION_FACTOR = true;
        DistributedFileSystem fs = cluster.getFileSystem();
        int fileLen = fs.getConf().getInt("dfs.bytes-per-checksum", 512);
        DFSTestUtil.createFile((FileSystem)fs, fileName, fileLen, (short)1, 0L);
        DFSTestUtil.waitReplication((FileSystem)fs, fileName, (short)1);
        ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, fileName);
        for (int i = 0; i < cluster.getDataNodes().size() && !DFSTestUtil.changeReplicaLength(cluster, block, i, lenDelta); ++i) {
        }
        fs.setReplication(fileName, (short)2);
        DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), fs.getConf());
        LocatedBlocks blocks = dfsClient.getNamenode().getBlockLocations(fileName.toString(), 0L, (long)fileLen);
        if (lenDelta < 0) {
            while (!blocks.get(0).isCorrupt() || 1 != blocks.get(0).getLocations().length) {
                Thread.sleep(100L);
                blocks = dfsClient.getNamenode().getBlockLocations(fileName.toString(), 0L, (long)fileLen);
            }
        } else {
            while (2 != blocks.get(0).getLocations().length) {
                Thread.sleep(100L);
                blocks = dfsClient.getNamenode().getBlockLocations(fileName.toString(), 0L, (long)fileLen);
            }
        }
        fs.delete(fileName, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void testReplicationWhenBlockCorruption() throws Exception {
        MiniDFSCluster cluster = null;
        try {
            HdfsConfiguration conf = new HdfsConfiguration();
            conf.setLong("dfs.namenode.reconstruction.pending.timeout-sec", 1L);
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(3).storagesPerDatanode(1).build();
            DistributedFileSystem fs = cluster.getFileSystem();
            Path filePath = new Path("/test");
            FSDataOutputStream create = fs.create(filePath);
            fs.setReplication(filePath, (short)1);
            create.write(new byte[1024]);
            create.close();
            ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, filePath);
            int numReplicaCreated = 0;
            for (DataNode dn : cluster.getDataNodes()) {
                if (dn.getFSDataset().contains(block)) continue;
                cluster.getFsDatasetTestUtils(dn).injectCorruptReplica(block);
                ++numReplicaCreated;
            }
            Assert.assertEquals((long)2L, (long)numReplicaCreated);
            fs.setReplication(filePath, (short)3);
            cluster.restartDataNodes();
            cluster.waitActive();
            cluster.triggerBlockReports();
            DFSTestUtil.waitReplication((FileSystem)fs, filePath, (short)3);
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testNoExtraReplicationWhenBlockReceivedIsLate() throws Exception {
        LOG.info("Test block replication when blockReceived is late");
        int numDataNodes = 3;
        int replication = 3;
        Configuration conf = new Configuration();
        conf.setInt("dfs.blocksize", 1024);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
        String testFile = "/replication-test-file";
        Path testPath = new Path("/replication-test-file");
        final BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
        try {
            cluster.waitActive();
            NameNode nn = cluster.getNameNode();
            DataNode dn = cluster.getDataNodes().get(0);
            DatanodeProtocolClientSideTranslatorPB spy = InternalDataNodeTestUtils.spyOnBposToNN(dn, nn);
            GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer((Answer)delayer).when((Object)spy)).blockReceivedAndDeleted((DatanodeRegistration)Mockito.anyObject(), Mockito.anyString(), (StorageReceivedDeletedBlocks[])Mockito.anyObject());
            DistributedFileSystem fs = cluster.getFileSystem();
            DFSTestUtil.createFile((FileSystem)fs, testPath, 1500L, (short)3, 0L);
            BlockManagerTestUtil.computeAllPendingWork(bm);
            Assert.assertTrue((this.pendingReplicationCount(bm) > 0L ? 1 : 0) != 0);
            delayer.waitForCall();
            delayer.proceed();
            delayer.waitForResult();
            for (DataNode d : cluster.getDataNodes()) {
                DataNodeTestUtils.triggerHeartbeat(d);
            }
            try {
                GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                    public Boolean get() {
                        return TestReplication.this.pendingReplicationCount(bm) == 0L;
                    }
                }, (long)100L, (long)3000L);
            }
            catch (TimeoutException e) {
                Assert.fail((String)"timed out while waiting for no pending replication.");
            }
            this.assertNoReplicationWasPerformed(cluster);
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testReplicationWhileUnderConstruction() throws Exception {
        MiniDFSCluster cluster;
        block9: {
            LOG.info("Test block replication in under construction");
            cluster = null;
            int numDataNodes = 6;
            int replication = 3;
            String testFile = "/replication-test-file";
            Path testPath = new Path(testFile);
            FSDataOutputStream stm = null;
            try {
                List<LocatedBlock> blocks;
                Configuration conf = new Configuration();
                cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).build();
                cluster.waitActive();
                DistributedFileSystem fs = cluster.getFileSystem();
                stm = AppendTestUtil.createFile((FileSystem)fs, testPath, 3);
                byte[] buffer = AppendTestUtil.initBuffer(1024);
                stm.write(buffer);
                stm.write(buffer);
                stm.write(buffer, 0, 1);
                stm.hflush();
                this.waitForBlockReplication(testFile, (ClientProtocol)cluster.getNameNodeRpc(), 3, 30000L, true, true);
                this.assertNoReplicationWasPerformed(cluster);
                try (FSDataInputStream in = fs.open(testPath);){
                    blocks = DFSTestUtil.getAllBlocks(in);
                }
                LocatedBlock lb = blocks.get(0);
                LocatedBlock lbOneReplica = new LocatedBlock(lb.getBlock(), new DatanodeInfo[]{lb.getLocations()[0]});
                cluster.getNameNodeRpc().reportBadBlocks(new LocatedBlock[]{lbOneReplica});
                this.waitForBlockReplication(testFile, (ClientProtocol)cluster.getNameNodeRpc(), 3, 30000L, true, true);
                if (stm == null) break block9;
            }
            catch (Throwable throwable) {
                if (stm != null) {
                    IOUtils.closeStream(stm);
                }
                if (cluster != null) {
                    cluster.shutdown();
                }
                throw throwable;
            }
            IOUtils.closeStream((Closeable)stm);
        }
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    private long pendingReplicationCount(BlockManager bm) {
        BlockManagerTestUtil.updateState(bm);
        return bm.getPendingReconstructionBlocksCount();
    }

    private void assertNoReplicationWasPerformed(MiniDFSCluster cluster) {
        for (DataNode dn : cluster.getDataNodes()) {
            MetricsRecordBuilder rb = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
            MetricsAsserts.assertCounter((String)"BlocksReplicated", (long)0L, (MetricsRecordBuilder)rb);
        }
    }

    private static class CorruptFileSimulatedFSDataset
    extends SimulatedFSDataset {
        CorruptFileSimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) {
            super(storage, conf);
        }

        @Override
        public synchronized InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException {
            InputStream result = super.getBlockInputStream(b);
            IOUtils.skipFully((InputStream)result, (long)seekOffset);
            return new CorruptFileSimulatedInputStream(result);
        }

        static class Factory
        extends FsDatasetSpi.Factory<CorruptFileSimulatedFSDataset> {
            Factory() {
            }

            public CorruptFileSimulatedFSDataset newInstance(DataNode datanode, DataStorage storage, Configuration conf) throws IOException {
                return new CorruptFileSimulatedFSDataset(datanode, storage, conf);
            }

            public boolean isSimulated() {
                return true;
            }
        }

        private static class CorruptFileSimulatedInputStream
        extends InputStream {
            private InputStream inputStream;

            CorruptFileSimulatedInputStream(InputStream is) {
                this.inputStream = is;
            }

            @Override
            public int read() throws IOException {
                int ret = this.inputStream.read();
                if (ret > 0) {
                    throw new IOException("Input/output error");
                }
                return ret;
            }

            @Override
            public int read(byte[] b) throws IOException {
                int ret = this.inputStream.read(b);
                if (ret > 0) {
                    throw new IOException("Input/output error");
                }
                return ret;
            }
        }
    }
}

