/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.io.asyncfs;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Daemon;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={MiscTests.class, MediumTests.class})
public class TestFanOutOneBlockAsyncDFSOutput {
    private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class);
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static DistributedFileSystem FS;
    private static EventLoopGroup EVENT_LOOP_GROUP;
    private static Class<? extends Channel> CHANNEL_CLASS;
    private static int READ_TIMEOUT_MS;
    @Rule
    public TestName name = new TestName();

    @BeforeClass
    public static void setUp() throws Exception {
        TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", READ_TIMEOUT_MS);
        TEST_UTIL.startMiniDFSCluster(3);
        FS = TEST_UTIL.getDFSCluster().getFileSystem();
        EVENT_LOOP_GROUP = new NioEventLoopGroup();
        CHANNEL_CLASS = NioSocketChannel.class;
    }

    @AfterClass
    public static void tearDown() throws IOException, InterruptedException {
        if (EVENT_LOOP_GROUP != null) {
            EVENT_LOOP_GROUP.shutdownGracefully().sync();
        }
        TEST_UTIL.shutdownMiniDFSCluster();
    }

    private void ensureAllDatanodeAlive() throws InterruptedException {
        while (true) {
            try {
                FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem)FS, (Path)new Path("/ensureDatanodeAlive"), (boolean)true, (boolean)true, (short)3, (long)FS.getDefaultBlockSize(), (EventLoopGroup)EVENT_LOOP_GROUP, CHANNEL_CLASS);
                out.close();
            }
            catch (IOException e) {
                Thread.sleep(100L);
                continue;
            }
            break;
        }
    }

    static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) throws IOException, InterruptedException, ExecutionException {
        int i;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        byte[] b = new byte[10];
        Random rand = new Random(12345L);
        for (i = 0; i < 10; ++i) {
            rand.nextBytes(b);
            out.write(b);
            futures.add(out.flush(false));
            futures.add(out.flush(false));
        }
        for (i = 0; i < 10; ++i) {
            Assert.assertEquals((long)((i + 1) * b.length), (long)((Long)((CompletableFuture)futures.get(2 * i)).join()));
            Assert.assertEquals((long)((i + 1) * b.length), (long)((Long)((CompletableFuture)futures.get(2 * i + 1)).join()));
        }
        out.close();
        Assert.assertEquals((long)(b.length * 10), (long)fs.getFileStatus(f).getLen());
        byte[] actual = new byte[b.length];
        rand.setSeed(12345L);
        try (FSDataInputStream in = fs.open(f);){
            for (int i2 = 0; i2 < 10; ++i2) {
                in.readFully(actual);
                rand.nextBytes(b);
                Assert.assertArrayEquals((byte[])b, (byte[])actual);
            }
            Assert.assertEquals((long)-1L, (long)in.read());
        }
    }

    @Test
    public void test() throws IOException, InterruptedException, ExecutionException {
        Path f = new Path("/" + this.name.getMethodName());
        EventLoop eventLoop = EVENT_LOOP_GROUP.next();
        FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem)FS, (Path)f, (boolean)true, (boolean)false, (short)3, (long)FS.getDefaultBlockSize(), (EventLoopGroup)eventLoop, CHANNEL_CLASS);
        TestFanOutOneBlockAsyncDFSOutput.writeAndVerify((FileSystem)FS, f, (AsyncFSOutput)out);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecover() throws IOException, InterruptedException, ExecutionException {
        Path f = new Path("/" + this.name.getMethodName());
        EventLoop eventLoop = EVENT_LOOP_GROUP.next();
        FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem)FS, (Path)f, (boolean)true, (boolean)false, (short)3, (long)FS.getDefaultBlockSize(), (EventLoopGroup)eventLoop, CHANNEL_CLASS);
        byte[] b = new byte[10];
        ThreadLocalRandom.current().nextBytes(b);
        out.write(b, 0, b.length);
        out.flush(false).get();
        TEST_UTIL.getDFSCluster().restartDataNode(0);
        try {
            out.write(b, 0, b.length);
            try {
                out.flush(false).get();
                Assert.fail((String)"flush should fail");
            }
            catch (ExecutionException e) {
                LOG.info("expected exception caught", (Throwable)e);
            }
            out.recoverAndClose(null);
            Assert.assertEquals((long)b.length, (long)FS.getFileStatus(f).getLen());
            byte[] actual = new byte[b.length];
            try (FSDataInputStream in = FS.open(f);){
                in.readFully(actual);
            }
            Assert.assertArrayEquals((byte[])b, (byte[])actual);
        }
        finally {
            this.ensureAllDatanodeAlive();
        }
    }

    @Test
    public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
        Path f = new Path("/" + this.name.getMethodName());
        EventLoop eventLoop = EVENT_LOOP_GROUP.next();
        FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem)FS, (Path)f, (boolean)true, (boolean)false, (short)3, (long)FS.getDefaultBlockSize(), (EventLoopGroup)eventLoop, CHANNEL_CLASS);
        Thread.sleep(READ_TIMEOUT_MS * 2);
        TestFanOutOneBlockAsyncDFSOutput.writeAndVerify((FileSystem)FS, f, (AsyncFSOutput)out);
    }

    @Test
    public void testCreateParentFailed() throws IOException {
        Path f = new Path("/" + this.name.getMethodName() + "/test");
        EventLoop eventLoop = EVENT_LOOP_GROUP.next();
        try {
            FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem)FS, (Path)f, (boolean)true, (boolean)false, (short)3, (long)FS.getDefaultBlockSize(), (EventLoopGroup)eventLoop, CHANNEL_CLASS);
            Assert.fail((String)"should fail with parent does not exist");
        }
        catch (RemoteException e) {
            LOG.info("expected exception caught", (Throwable)e);
            Assert.assertThat((Object)e.unwrapRemoteException(), (Matcher)CoreMatchers.instanceOf(FileNotFoundException.class));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConnectToDatanodeFailed() throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InterruptedException, NoSuchFieldException {
        Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
        xceiverServerDaemonField.setAccessible(true);
        Class<?> xceiverServerClass = Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
        Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers", new Class[0]);
        numPeersMethod.setAccessible(true);
        ((DataNode)TEST_UTIL.getDFSCluster().getDataNodes().get(0)).shutdownDatanode(true);
        try {
            Path f = new Path("/test");
            EventLoop eventLoop = EVENT_LOOP_GROUP.next();
            try {
                FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem)FS, (Path)f, (boolean)true, (boolean)false, (short)3, (long)FS.getDefaultBlockSize(), (EventLoopGroup)eventLoop, CHANNEL_CLASS);
                Assert.fail((String)"should fail with connection error");
            }
            catch (IOException e) {
                LOG.info("expected exception caught", (Throwable)e);
            }
            for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) {
                Daemon daemon = (Daemon)xceiverServerDaemonField.get(dn);
                Assert.assertEquals((Object)0, (Object)numPeersMethod.invoke((Object)daemon.getRunnable(), new Object[0]));
            }
        }
        finally {
            TEST_UTIL.getDFSCluster().restartDataNode(0);
            this.ensureAllDatanodeAlive();
        }
    }

    @Test
    public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
        Path f = new Path("/" + this.name.getMethodName());
        EventLoop eventLoop = EVENT_LOOP_GROUP.next();
        FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem)FS, (Path)f, (boolean)true, (boolean)false, (short)3, (long)0x40000000L, (EventLoopGroup)eventLoop, CHANNEL_CLASS);
        byte[] b = new byte[0x3200000];
        ThreadLocalRandom.current().nextBytes(b);
        out.write(b);
        out.flush(false);
        Assert.assertEquals((long)b.length, (long)((Long)out.flush(false).get()));
        out.close();
        Assert.assertEquals((long)b.length, (long)FS.getFileStatus(f).getLen());
        byte[] actual = new byte[b.length];
        try (FSDataInputStream in = FS.open(f);){
            in.readFully(actual);
        }
        Assert.assertArrayEquals((byte[])b, (byte[])actual);
    }

    static {
        READ_TIMEOUT_MS = 2000;
    }
}

