/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={RegionServerTests.class, MediumTests.class})
public class TestRegionServerAbort {
    private static final byte[] FAMILY_BYTES = Bytes.toBytes((String)"f");
    private static final Log LOG = LogFactory.getLog(TestRegionServerAbort.class);
    private HBaseTestingUtility testUtil;
    private Configuration conf;
    private MiniDFSCluster dfsCluster;
    private MiniHBaseCluster cluster;

    @Before
    public void setup() throws Exception {
        this.testUtil = new HBaseTestingUtility();
        this.conf = this.testUtil.getConfiguration();
        this.conf.set("hbase.coprocessor.regionserver.classes", StopBlockingRegionObserver.class.getName());
        this.conf.set("hbase.coprocessor.region.classes", StopBlockingRegionObserver.class.getName());
        this.conf.set("dfs.blocksize", Long.toString(102400L));
        this.conf.set("dfs.client.read.prefetch.size", Long.toString(102400L));
        this.testUtil.startMiniZKCluster();
        this.dfsCluster = this.testUtil.startMiniDFSCluster(2);
        this.cluster = this.testUtil.startMiniHBaseCluster(1, 2);
    }

    @After
    public void tearDown() throws Exception {
        for (JVMClusterUtil.RegionServerThread t : this.cluster.getRegionServerThreads()) {
            HRegionServer rs = t.getRegionServer();
            RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost();
            StopBlockingRegionObserver cp = (StopBlockingRegionObserver)cpHost.findCoprocessor(StopBlockingRegionObserver.class.getName());
            cp.setStopAllowed(true);
        }
        this.testUtil.shutdownMiniCluster();
    }

    @Test
    public void testAbortFromRPC() throws Exception {
        TableName tableName = TableName.valueOf((String)"testAbortFromRPC");
        HTable table = this.testUtil.createTable(tableName, FAMILY_BYTES);
        this.testUtil.loadTable((Table)table, FAMILY_BYTES);
        LOG.info((Object)"Wrote data");
        this.cluster.flushcache(tableName);
        LOG.info((Object)"Flushed table");
        Put put = new Put(new byte[]{0, 0, 0, 0});
        put.addColumn(FAMILY_BYTES, Bytes.toBytes((String)"c"), new byte[0]);
        put.setAttribute("DO_ABORT", new byte[]{1});
        table.put(put);
        HRegion firstRegion = this.cluster.findRegionsForTable(tableName).get(0);
        Assert.assertNotNull((Object)firstRegion);
        Assert.assertNotNull((Object)firstRegion.getRegionServerServices());
        LOG.info((Object)("isAborted = " + firstRegion.getRegionServerServices().isAborted()));
        Assert.assertTrue((boolean)firstRegion.getRegionServerServices().isAborted());
        LOG.info((Object)("isStopped = " + firstRegion.getRegionServerServices().isStopped()));
        Assert.assertTrue((boolean)firstRegion.getRegionServerServices().isStopped());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultiAbort() {
        Assert.assertTrue((this.cluster.getRegionServerThreads().size() > 0 ? 1 : 0) != 0);
        JVMClusterUtil.RegionServerThread t = this.cluster.getRegionServerThreads().get(0);
        Assert.assertTrue((boolean)t.isAlive());
        final HRegionServer rs = t.getRegionServer();
        Assert.assertFalse((boolean)rs.isAborted());
        RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost();
        StopBlockingRegionObserver cp = (StopBlockingRegionObserver)cpHost.findCoprocessor(StopBlockingRegionObserver.class.getName());
        cp.setStopAllowed(true);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        try {
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    rs.abort("Abort 1");
                }
            });
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    rs.abort("Abort 2");
                }
            });
            long testTimeoutMs = 10000L;
            Waiter.waitFor((Configuration)this.cluster.getConf(), (long)testTimeoutMs, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return rs.isStopped();
                }
            });
            Assert.assertEquals((long)1L, (long)cp.getNumAbortsRequested());
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test
    public void testStopOverrideFromCoprocessor() throws Exception {
        HBaseAdmin admin = this.testUtil.getHBaseAdmin();
        HRegionServer regionserver = this.cluster.getRegionServer(0);
        admin.stopRegionServer(regionserver.getServerName().getHostAndPort());
        Assert.assertFalse((boolean)this.cluster.getRegionServer(0).isAborted());
        Assert.assertFalse((boolean)this.cluster.getRegionServer(0).isStopped());
    }

    public static class StopBlockingRegionObserver
    extends BaseRegionObserver
    implements RegionServerObserver {
        public static final String DO_ABORT = "DO_ABORT";
        private boolean stopAllowed;
        private AtomicInteger abortCount = new AtomicInteger();

        public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException {
            if (put.getAttribute(DO_ABORT) != null) {
                HRegionServer rs = (HRegionServer)((RegionCoprocessorEnvironment)c.getEnvironment()).getRegionServerServices();
                LOG.info((Object)("Triggering abort for regionserver " + rs.getServerName()));
                rs.abort("Aborting for test");
            }
        }

        public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env) throws IOException {
            this.abortCount.incrementAndGet();
            if (!this.stopAllowed) {
                throw new IOException("Stop not allowed");
            }
        }

        public int getNumAbortsRequested() {
            return this.abortCount.get();
        }

        public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA, Region regionB) throws IOException {
        }

        public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA, Region regionB, Region mergedRegion) throws IOException {
        }

        public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException {
        }

        public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA, Region regionB, Region mergedRegion) throws IOException {
        }

        public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA, Region regionB) throws IOException {
        }

        public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA, Region regionB) throws IOException {
        }

        public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
        }

        public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
        }

        public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
            return null;
        }

        public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx, List<AdminProtos.WALEntry> entries, CellScanner cells) throws IOException {
        }

        public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx, List<AdminProtos.WALEntry> entries, CellScanner cells) throws IOException {
        }

        public void setStopAllowed(boolean allowed) {
            this.stopAllowed = allowed;
        }

        public boolean isStopAllowed() {
            return this.stopAllowed;
        }
    }
}

