/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientFaultInjector;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;

public class TestPread {
    static final long seed = 3735928559L;
    static final int blockSize = 4096;
    static final int numBlocksPerFile = 12;
    static final int fileSize = 49152;
    boolean simulatedStorage;
    boolean isHedgedRead;

    @Before
    public void setup() {
        this.simulatedStorage = false;
        this.isHedgedRead = false;
    }

    private void writeFile(FileSystem fileSys, Path name) throws IOException {
        int replication = 3;
        FSDataOutputStream stm = fileSys.create(name, true, 4096, (short)replication, 4096L);
        stm.close();
        FSDataInputStream in = fileSys.open(name);
        byte[] buffer = new byte[49152];
        in.readFully(0L, buffer, 0, 0);
        IOException res = null;
        try {
            in.readFully(0L, buffer, 0, 1);
        }
        catch (IOException e) {
            res = e;
        }
        Assert.assertTrue((String)"Error reading beyond file boundary.", (res != null ? 1 : 0) != 0);
        in.close();
        if (!fileSys.delete(name, true)) {
            Assert.assertTrue((String)"Cannot delete file", (boolean)false);
        }
        DFSTestUtil.createFile(fileSys, name, 49152, 49152L, 4096L, (short)replication, 3735928559L);
    }

    private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
        for (int idx = 0; idx < actual.length; ++idx) {
            Assert.assertEquals((String)(message + " byte " + (from + idx) + " differs. expected " + expected[from + idx] + " actual " + actual[idx]), (long)actual[idx], (long)expected[from + idx]);
            actual[idx] = 0;
        }
    }

    private void doPread(FSDataInputStream stm, long position, byte[] buffer, int offset, int length) throws IOException {
        int nread = 0;
        long totalRead = 0L;
        DFSInputStream dfstm = null;
        if (stm.getWrappedStream() instanceof DFSInputStream) {
            dfstm = (DFSInputStream)stm.getWrappedStream();
            totalRead = dfstm.getReadStatistics().getTotalBytesRead();
        }
        while (nread < length) {
            int nbytes = stm.read(position + (long)nread, buffer, offset + nread, length - nread);
            Assert.assertTrue((String)"Error in pread", (nbytes > 0 ? 1 : 0) != 0);
            nread += nbytes;
        }
        if (dfstm != null) {
            if (this.isHedgedRead) {
                Assert.assertTrue((String)"Expected read statistic to be incremented", ((long)length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead ? 1 : 0) != 0);
            } else {
                Assert.assertEquals((String)"Expected read statistic to be incremented", (long)length, (long)(dfstm.getReadStatistics().getTotalBytesRead() - totalRead));
            }
        }
    }

    private void pReadFile(FileSystem fileSys, Path name) throws IOException {
        FSDataInputStream stm = fileSys.open(name);
        byte[] expected = new byte[49152];
        if (this.simulatedStorage) {
            assert (fileSys instanceof DistributedFileSystem);
            DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
            LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(name.toString(), 0L, 49152L);
            DFSTestUtil.fillExpectedBuf(lbs, expected);
        } else {
            Random rand = new Random(3735928559L);
            rand.nextBytes(expected);
        }
        byte[] actual = new byte[4096];
        stm.readFully(actual);
        this.checkAndEraseData(actual, 0, expected, "Read Sanity Test");
        actual = new byte[8192];
        this.doPread(stm, 0L, actual, 0, 8192);
        this.checkAndEraseData(actual, 0, expected, "Pread Test 1");
        actual = new byte[4096];
        stm.readFully(actual);
        this.checkAndEraseData(actual, 4096, expected, "Pread Test 2");
        stm.readFully(2048L, actual, 0, 4096);
        this.checkAndEraseData(actual, 2048, expected, "Pread Test 3");
        actual = new byte[8192];
        stm.readFully(2048L, actual);
        this.checkAndEraseData(actual, 2048, expected, "Pread Test 4");
        actual = new byte[8192];
        stm.readFully(38912L, actual);
        this.checkAndEraseData(actual, 38912, expected, "Pread Test 5");
        actual = new byte[4096];
        stm.readFully(actual);
        this.checkAndEraseData(actual, 8192, expected, "Pread Test 6");
        stm.close();
        stm = fileSys.open(name);
        stm.readFully(1L, actual, 0, 4096);
        stm.readFully(16384L, actual, 0, 4096);
        stm.readFully(28672L, actual, 0, 4096);
        actual = new byte[12288];
        stm.readFully(0L, actual, 0, 12288);
        this.checkAndEraseData(actual, 0, expected, "Pread Test 7");
        actual = new byte[32768];
        stm.readFully(12288L, actual, 0, 32768);
        this.checkAndEraseData(actual, 12288, expected, "Pread Test 8");
        stm.readFully(47104L, actual, 0, 2048);
        IOException res = null;
        try {
            stm.readFully(47104L, actual, 0, 4096);
        }
        catch (IOException e) {
            res = e;
        }
        Assert.assertTrue((String)"Error reading beyond file boundary.", (res != null ? 1 : 0) != 0);
        stm.close();
    }

    private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys, Path name) throws IOException {
        if (this.simulatedStorage) {
            return;
        }
        int numBlocks = 1;
        Assert.assertTrue((numBlocks <= 3 ? 1 : 0) != 0);
        byte[] expected = new byte[numBlocks * 4096];
        Random rand = new Random(3735928559L);
        rand.nextBytes(expected);
        byte[] actual = new byte[numBlocks * 4096];
        FSDataInputStream stm = fileSys.open(name);
        stm.readFully(0L, actual);
        this.checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Setup");
        Assert.assertTrue((boolean)cluster.restartDataNodes());
        cluster.waitActive();
        stm.readFully(0L, actual);
        this.checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
    }

    private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
        Assert.assertTrue((boolean)fileSys.exists(name));
        Assert.assertTrue((boolean)fileSys.delete(name, true));
        Assert.assertTrue((!fileSys.exists(name) ? 1 : 0) != 0);
    }

    private Callable<Void> getPReadFileCallable(final FileSystem fileSys, final Path file) {
        return new Callable<Void>(){

            @Override
            public Void call() throws IOException {
                TestPread.this.pReadFile(fileSys, file);
                return null;
            }
        };
    }

    @Test
    public void testPreadDFS() throws IOException {
        Configuration conf = new Configuration();
        this.dfsPreadTest(conf, false, true);
        this.dfsPreadTest(conf, true, true);
    }

    @Test
    public void testPreadDFSNoChecksum() throws IOException {
        Configuration conf = new Configuration();
        GenericTestUtils.setLogLevel((Logger)DataTransferProtocol.LOG, (Level)Level.ALL);
        this.dfsPreadTest(conf, false, false);
        this.dfsPreadTest(conf, true, false);
    }

    @Test
    public void testHedgedPreadDFSBasic() throws IOException {
        this.isHedgedRead = true;
        Configuration conf = new Configuration();
        conf.setInt("dfs.client.hedged.read.threadpool.size", 5);
        conf.setLong("dfs.client.hedged.read.threshold.millis", 1L);
        this.dfsPreadTest(conf, false, true);
        this.dfsPreadTest(conf, true, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHedgedReadLoopTooManyTimes() throws IOException {
        Configuration conf = new Configuration();
        int numHedgedReadPoolThreads = 5;
        int hedgedReadTimeoutMillis = 50;
        conf.setInt("dfs.client.hedged.read.threadpool.size", numHedgedReadPoolThreads);
        conf.setLong("dfs.client.hedged.read.threshold.millis", 50L);
        conf.setInt("dfs.client.retry.window.base", 0);
        DFSClientFaultInjector.set((DFSClientFaultInjector)((DFSClientFaultInjector)Mockito.mock(DFSClientFaultInjector.class)));
        DFSClientFaultInjector injector = DFSClientFaultInjector.get();
        int sleepMs = 100;
        ((DFSClientFaultInjector)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                Thread.sleep(150L);
                if (DFSClientFaultInjector.exceptionNum.compareAndSet(0L, 1L)) {
                    System.out.println("-------------- throw Checksum Exception");
                    throw new ChecksumException("ChecksumException test", 100L);
                }
                return null;
            }
        }).when((Object)injector)).fetchFromDatanodeException();
        ((DFSClientFaultInjector)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                Thread.sleep(200L);
                return null;
            }
        }).when((Object)injector)).readFromDatanodeDelay();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
        DistributedFileSystem fileSys = cluster.getFileSystem();
        DFSClient dfsClient = fileSys.getClient();
        FSDataOutputStream output = null;
        DFSInputStream input = null;
        String filename = "/hedgedReadMaxOut.dat";
        try {
            Path file = new Path(filename);
            output = fileSys.create(file, (short)2);
            byte[] data = new byte[65536];
            output.write(data);
            output.flush();
            output.write(data);
            output.flush();
            output.write(data);
            output.flush();
            output.close();
            byte[] buffer = new byte[65536];
            input = dfsClient.open(filename);
            input.read(0L, buffer, 0, 1024);
            input.close();
            Assert.assertEquals((long)3L, (long)input.getHedgedReadOpsLoopNumForTesting());
        }
        catch (BlockMissingException e) {
            try {
                Assert.assertTrue((boolean)false);
            }
            catch (Throwable throwable) {
                Mockito.reset((Object[])new DFSClientFaultInjector[]{injector});
                IOUtils.cleanup(null, (Closeable[])new Closeable[]{input});
                IOUtils.cleanup(null, (Closeable[])new Closeable[]{output});
                fileSys.close();
                cluster.shutdown();
                throw throwable;
            }
            Mockito.reset((Object[])new DFSClientFaultInjector[]{injector});
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{input});
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{output});
            fileSys.close();
            cluster.shutdown();
        }
        Mockito.reset((Object[])new DFSClientFaultInjector[]{injector});
        IOUtils.cleanup(null, (Closeable[])new Closeable[]{input});
        IOUtils.cleanup(null, (Closeable[])new Closeable[]{output});
        fileSys.close();
        cluster.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaxOutHedgedReadPool() throws IOException, InterruptedException, ExecutionException {
        this.isHedgedRead = true;
        Configuration conf = new Configuration();
        int numHedgedReadPoolThreads = 5;
        int initialHedgedReadTimeoutMillis = 50000;
        int fixedSleepIntervalMillis = 50;
        conf.setInt("dfs.client.hedged.read.threadpool.size", numHedgedReadPoolThreads);
        conf.setLong("dfs.client.hedged.read.threshold.millis", 50000L);
        DFSClientFaultInjector.set((DFSClientFaultInjector)((DFSClientFaultInjector)Mockito.mock(DFSClientFaultInjector.class)));
        DFSClientFaultInjector injector = DFSClientFaultInjector.get();
        ((DFSClientFaultInjector)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                Thread.sleep(50L);
                return null;
            }
        }).when((Object)injector)).startFetchFromDatanode();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
        DistributedFileSystem fileSys = cluster.getFileSystem();
        DFSClient dfsClient = fileSys.getClient();
        DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
        metrics.hedgedReadOps.set(0L);
        metrics.hedgedReadOpsWin.set(0L);
        metrics.hedgedReadOpsInCurThread.set(0L);
        try {
            int i;
            Path file1 = new Path("hedgedReadMaxOut.dat");
            this.writeFile((FileSystem)fileSys, file1);
            this.pReadFile((FileSystem)fileSys, file1);
            Assert.assertTrue((metrics.getHedgedReadOps() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((metrics.getHedgedReadOpsInCurThread() == 0L ? 1 : 0) != 0);
            Configuration conf2 = new Configuration(cluster.getConfiguration(0));
            conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
            conf2.setLong("dfs.client.hedged.read.threshold.millis", 50L);
            fileSys.close();
            fileSys = (DistributedFileSystem)FileSystem.get((URI)cluster.getURI(0), (Configuration)conf2);
            metrics = fileSys.getClient().getHedgedReadMetrics();
            this.pReadFile((FileSystem)fileSys, file1);
            Assert.assertTrue((metrics.getHedgedReadOps() > 0L ? 1 : 0) != 0);
            Assert.assertTrue((metrics.getHedgedReadOpsInCurThread() == 0L ? 1 : 0) != 0);
            int factor = 10;
            int numHedgedReads = numHedgedReadPoolThreads * factor;
            long initialReadOpsValue = metrics.getHedgedReadOps();
            ExecutorService executor = Executors.newFixedThreadPool(numHedgedReads);
            ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
            for (i = 0; i < numHedgedReads; ++i) {
                futures.add(executor.submit(this.getPReadFileCallable((FileSystem)fileSys, file1)));
            }
            for (i = 0; i < numHedgedReads; ++i) {
                ((Future)futures.get(i)).get();
            }
            Assert.assertTrue((metrics.getHedgedReadOps() > initialReadOpsValue ? 1 : 0) != 0);
            Assert.assertTrue((metrics.getHedgedReadOpsInCurThread() > 0L ? 1 : 0) != 0);
            this.cleanupFile((FileSystem)fileSys, file1);
            executor.shutdown();
        }
        catch (Throwable throwable) {
            fileSys.close();
            cluster.shutdown();
            Mockito.reset((Object[])new DFSClientFaultInjector[]{injector});
            throw throwable;
        }
        fileSys.close();
        cluster.shutdown();
        Mockito.reset((Object[])new DFSClientFaultInjector[]{injector});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum) throws IOException {
        conf.setLong("dfs.blocksize", 4096L);
        conf.setLong("dfs.client.read.prefetch.size", 4096L);
        conf.setInt("dfs.client.retry.window.base", 0);
        if (this.simulatedStorage) {
            SimulatedFSDataset.setFactory(conf);
        }
        if (disableTransferTo) {
            conf.setBoolean("dfs.datanode.transferTo.allowed", false);
        }
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
        DistributedFileSystem fileSys = cluster.getFileSystem();
        fileSys.setVerifyChecksum(verifyChecksum);
        try {
            Path file1 = new Path("/preadtest.dat");
            this.writeFile((FileSystem)fileSys, file1);
            this.pReadFile((FileSystem)fileSys, file1);
            this.datanodeRestartTest(cluster, (FileSystem)fileSys, file1);
            this.cleanupFile((FileSystem)fileSys, file1);
        }
        finally {
            fileSys.close();
            cluster.shutdown();
        }
    }

    @Test
    public void testPreadDFSSimulated() throws IOException {
        this.simulatedStorage = true;
        this.testPreadDFS();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPreadLocalFS() throws IOException {
        HdfsConfiguration conf = new HdfsConfiguration();
        try (LocalFileSystem fileSys = FileSystem.getLocal((Configuration)conf);){
            Path file1 = new Path("build/test/data", "preadtest.dat");
            this.writeFile((FileSystem)fileSys, file1);
            this.pReadFile((FileSystem)fileSys, file1);
            this.cleanupFile((FileSystem)fileSys, file1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTruncateWhileReading() throws Exception {
        Path path = new Path("/testfile");
        int blockSize = 512;
        Configuration conf = new Configuration();
        conf.setLong("dfs.client.read.prefetch.size", 512L);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        try {
            DistributedFileSystem fs = cluster.getFileSystem();
            FSDataOutputStream dos = fs.create(path, true, 512, (short)1, 512L);
            dos.write(new byte[1536]);
            dos.close();
            final FSDataInputStream dis = fs.open(path);
            while (!fs.truncate(path, 10L)) {
                Thread.sleep(10L);
            }
            ExecutorService executor = Executors.newFixedThreadPool(1);
            Future<Void> future = executor.submit(new Callable<Void>(){

                @Override
                public Void call() throws IOException {
                    dis.readFully(512L, new byte[4]);
                    return null;
                }
            });
            try {
                future.get(4L, TimeUnit.SECONDS);
                Assert.fail();
            }
            catch (ExecutionException ee) {
                Assert.assertTrue((String)ee.toString(), (boolean)(ee.getCause() instanceof EOFException));
            }
            finally {
                future.cancel(true);
                executor.shutdown();
            }
        }
        finally {
            cluster.shutdown();
        }
    }

    public static void main(String[] args) throws Exception {
        new TestPread().testPreadDFS();
    }
}

