/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.io.asyncfs;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSTestBase;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.io.asyncfs.ProtobufDecoder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={MiscTests.class, MediumTests.class})
public class TestFanOutOneBlockAsyncDFSOutputHang
extends AsyncFSTestBase {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutputHang.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutputHang.class);
    private static DistributedFileSystem FS;
    private static EventLoopGroup EVENT_LOOP_GROUP;
    private static Class<? extends Channel> CHANNEL_CLASS;
    private static FanOutOneBlockAsyncDFSOutput OUT;
    @Rule
    public TestName name = new TestName();

    @BeforeClass
    public static void setUp() throws Exception {
        TestFanOutOneBlockAsyncDFSOutputHang.startMiniDFSCluster(2);
        FS = CLUSTER.getFileSystem();
        EVENT_LOOP_GROUP = new NioEventLoopGroup();
        CHANNEL_CLASS = NioSocketChannel.class;
        Path f = new Path("/testHang");
        EventLoop eventLoop = EVENT_LOOP_GROUP.next();
        OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem)FS, (Path)f, (boolean)true, (boolean)false, (short)2, (long)FS.getDefaultBlockSize(), (EventLoopGroup)eventLoop, CHANNEL_CLASS);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (OUT != null) {
            OUT.recoverAndClose(null);
        }
        if (EVENT_LOOP_GROUP != null) {
            EVENT_LOOP_GROUP.shutdownGracefully().sync().get();
        }
        TestFanOutOneBlockAsyncDFSOutputHang.shutdownMiniDFSCluster();
    }

    @Test
    public void testFlushHangWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception {
        final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2);
        List channels = OUT.getDatanodeList();
        Channel dn1Channel = (Channel)channels.get(0);
        ArrayList protobufDecoderNames = new ArrayList();
        dn1Channel.pipeline().forEach(entry -> {
            if (ProtobufDecoder.class.isInstance(entry.getValue())) {
                protobufDecoderNames.add(entry.getKey());
            }
        });
        Assert.assertTrue((protobufDecoderNames.size() == 1 ? 1 : 0) != 0);
        dn1Channel.pipeline().addAfter((String)protobufDecoderNames.get(0), "dn1AckReceivedHandler", (ChannelHandler)new ChannelInboundHandlerAdapter(){

            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                super.channelRead(ctx, msg);
                dn1AckReceivedCyclicBarrier.await();
            }
        });
        Channel dn2Channel = (Channel)channels.get(1);
        dn2Channel.pipeline().addFirst(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (!(msg instanceof ByteBuf)) {
                    ctx.fireChannelRead(msg);
                } else {
                    ((ByteBuf)msg).release();
                }
            }
        }});
        byte[] b = new byte[10];
        ThreadLocalRandom.current().nextBytes(b);
        OUT.write(b, 0, b.length);
        CompletableFuture future = OUT.flush(false);
        dn1AckReceivedCyclicBarrier.await();
        dn1Channel.close().get();
        try {
            future.get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((e != null ? 1 : 0) != 0);
            LOG.info("expected exception caught when get future", (Throwable)e);
        }
        channels.forEach(ch -> {
            try {
                ch.closeFuture().get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
    }
}

