/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={RegionServerTests.class, MediumTests.class})
public class TestAsyncFSWALRollStuck {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class);
    private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
    private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup();
    private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
    private static ScheduledExecutorService EXECUTOR;
    private static BlockingQueue<CompletableFuture<Long>> FUTURES;
    private static AtomicInteger SYNC_COUNT;
    private static CountDownLatch ARRIVE;
    private static CountDownLatch RESUME;
    private static TableName TN;
    private static RegionInfo RI;
    private static MultiVersionConcurrencyControl MVCC;
    private static AsyncFSWAL WAL;
    private static ExecutorService ROLL_EXEC;

    @BeforeClass
    public static void setUp() throws Exception {
        Configuration conf = UTIL.getConfiguration();
        conf.setClass("hbase.regionserver.wal.async.writer.impl", TestAsyncWriter.class, AsyncFSWALProvider.AsyncWriter.class);
        conf.setLong("hbase.wal.batch.size", 1L);
        TN = TableName.valueOf((String)"test");
        RI = RegionInfoBuilder.newBuilder((TableName)TN).build();
        MVCC = new MultiVersionConcurrencyControl();
        EXECUTOR = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build());
        Path rootDir = UTIL.getDataTestDir();
        ROLL_EXEC = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
        WALActionsListener listener = new WALActionsListener(){

            public void logRollRequested(WALActionsListener.RollRequestReason reason) {
                ROLL_EXEC.execute(() -> {
                    try {
                        WAL.rollWriter();
                    }
                    catch (Exception e) {
                        LOG.warn("failed to roll writer", (Throwable)e);
                    }
                });
            }
        };
        WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), null, rootDir, "log", "oldlog", conf, Arrays.asList(listener), true, null, null, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS, StreamSlowMonitor.create((Configuration)conf, (String)"monitor"));
        WAL.init();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        EXECUTOR.shutdownNow();
        ROLL_EXEC.shutdownNow();
        Closeables.close((Closeable)WAL, (boolean)true);
        UTIL.cleanupTestDir();
    }

    @Test
    public void testRoll() throws Exception {
        byte[] row = Bytes.toBytes((String)"family");
        WALEdit edit = new WALEdit();
        edit.add(CellBuilderFactory.create((CellBuilderType)CellBuilderType.SHALLOW_COPY).setFamily(row).setQualifier(row).setRow(row).setValue(row).setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Cell.Type.Put).build());
        WALKeyImpl key1 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC);
        WAL.appendData(RI, key1, edit);
        WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1L, MVCC);
        long txid = WAL.appendData(RI, key2, edit);
        UTIL.waitFor(10000L, () -> FUTURES.size() == 2);
        ((CompletableFuture)FUTURES.poll()).completeExceptionally(new IOException("inject error"));
        ((CompletableFuture)FUTURES.poll()).completeExceptionally(new IOException("inject error"));
        ARRIVE.await();
        EXECUTOR.schedule(() -> RESUME.countDown(), 1L, TimeUnit.SECONDS);
        WAL.rollWriter();
        WAL.sync(txid);
    }

    static {
        FUTURES = new ArrayBlockingQueue<CompletableFuture<Long>>(3);
        SYNC_COUNT = new AtomicInteger(0);
        ARRIVE = new CountDownLatch(1);
        RESUME = new CountDownLatch(1);
    }

    public static final class TestAsyncWriter
    extends AsyncProtobufLogWriter {
        public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
            super(eventLoopGroup, channelClass);
        }

        public CompletableFuture<Long> sync(boolean forceSync) {
            int count = SYNC_COUNT.incrementAndGet();
            if (count < 3) {
                CompletableFuture<Long> f = new CompletableFuture<Long>();
                FUTURES.offer(f);
                return f;
            }
            if (count == 3) {
                ARRIVE.countDown();
                try {
                    RESUME.await();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return super.sync(forceSync);
            }
            return super.sync(forceSync);
        }
    }
}

