/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={MasterTests.class, MediumTests.class})
public class TestSplitLogManager {
    private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
    private final ServerManager sm = (ServerManager)Mockito.mock(ServerManager.class);
    private ZKWatcher zkw;
    private DummyMasterServices master;
    private SplitLogManager slm;
    private Configuration conf;
    private int to;
    private static HBaseTestingUtility TEST_UTIL;

    @Before
    public void setup() throws Exception {
        TEST_UTIL = new HBaseTestingUtility();
        TEST_UTIL.startMiniZKCluster();
        this.conf = TEST_UTIL.getConfiguration();
        this.zkw = new ZKWatcher(this.conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
        this.master = new DummyMasterServices(this.zkw, this.conf);
        ZKUtil.deleteChildrenRecursively((ZKWatcher)this.zkw, (String)this.zkw.znodePaths.baseZNode);
        ZKUtil.createAndFailSilent((ZKWatcher)this.zkw, (String)this.zkw.znodePaths.baseZNode);
        Assert.assertTrue((ZKUtil.checkExists((ZKWatcher)this.zkw, (String)this.zkw.znodePaths.baseZNode) != -1 ? 1 : 0) != 0);
        LOG.debug(this.zkw.znodePaths.baseZNode + " created");
        ZKUtil.createAndFailSilent((ZKWatcher)this.zkw, (String)this.zkw.znodePaths.splitLogZNode);
        Assert.assertTrue((ZKUtil.checkExists((ZKWatcher)this.zkw, (String)this.zkw.znodePaths.splitLogZNode) != -1 ? 1 : 0) != 0);
        LOG.debug(this.zkw.znodePaths.splitLogZNode + " created");
        SplitLogCounters.resetCounters();
        Mockito.when((Object)this.sm.isServerOnline((ServerName)Mockito.any())).thenReturn((Object)true);
        this.to = 12000;
        this.conf.setInt("hbase.splitlog.manager.timeout", this.to);
        this.conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * this.to);
        this.conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
        this.to += 1600;
    }

    @After
    public void teardown() throws IOException, KeeperException {
        this.master.stop("");
        if (this.slm != null) {
            this.slm.stop();
        }
        TEST_UTIL.shutdownMiniZKCluster();
    }

    private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems) throws Exception {
        Expr e = new Expr(){

            @Override
            public long eval() {
                return ctr.sum();
            }
        };
        this.waitForCounter(e, oldval, newval, timems);
    }

    private void waitForCounter(final Expr e, final long oldval, long newval, long timems) throws Exception {
        TEST_UTIL.waitFor(timems, 10L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return e.eval() != oldval;
            }
        });
        Assert.assertEquals((long)newval, (long)e.eval());
    }

    private SplitLogManager.Task findOrCreateOrphanTask(String path) {
        return this.slm.tasks.computeIfAbsent(path, k -> {
            LOG.info("creating orphan task " + k);
            SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
            return new SplitLogManager.Task();
        });
    }

    private String submitTaskAndWait(SplitLogManager.TaskBatch batch, String name) throws KeeperException, InterruptedException {
        String tasknode = ZKSplitLog.getEncodedNodeName((ZKWatcher)this.zkw, (String)name);
        TestMasterAddressTracker.NodeCreationListener listener = new TestMasterAddressTracker.NodeCreationListener(this.zkw, tasknode);
        this.zkw.registerListener((ZKListener)listener);
        ZKUtil.watchAndCheckExists((ZKWatcher)this.zkw, (String)tasknode);
        this.slm.enqueueSplitTask(name, batch);
        Assert.assertEquals((long)1L, (long)batch.installed);
        Assert.assertTrue((this.findOrCreateOrphanTask((String)tasknode).batch == batch ? 1 : 0) != 0);
        Assert.assertEquals((long)1L, (long)SplitLogCounters.tot_mgr_node_create_queued.sum());
        LOG.debug("waiting for task node creation");
        listener.waitForCreation();
        LOG.debug("task created");
        return tasknode;
    }

    @Test(timeout=180000L)
    public void testTaskCreation() throws Exception {
        LOG.info("TestTaskCreation - test the creation of a task in zk");
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        SplitLogManager.TaskBatch batch = new SplitLogManager.TaskBatch();
        String tasknode = this.submitTaskAndWait(batch, "foo/1");
        byte[] data = ZKUtil.getData((ZKWatcher)this.zkw, (String)tasknode);
        SplitLogTask slt = SplitLogTask.parseFrom((byte[])data);
        LOG.info("Task node created " + slt.toString());
        Assert.assertTrue((boolean)slt.isUnassigned(this.master.getServerName()));
    }

    @Test(timeout=180000L)
    public void testOrphanTaskAcquisition() throws Exception {
        LOG.info("TestOrphanTaskAcquisition");
        String tasknode = ZKSplitLog.getEncodedNodeName((ZKWatcher)this.zkw, (String)"orphan/test/slash");
        SplitLogTask.Owned slt = new SplitLogTask.Owned(this.master.getServerName());
        this.zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        this.waitForCounter(SplitLogCounters.tot_mgr_orphan_task_acquired, 0L, 1L, (long)(this.to / 2));
        SplitLogManager.Task task = this.findOrCreateOrphanTask(tasknode);
        Assert.assertTrue((boolean)task.isOrphan());
        this.waitForCounter(SplitLogCounters.tot_mgr_heartbeat, 0L, 1L, (long)(this.to / 2));
        Assert.assertFalse((boolean)task.isUnassigned());
        long curt = System.currentTimeMillis();
        Assert.assertTrue((task.last_update <= curt && task.last_update > curt - 1000L ? 1 : 0) != 0);
        LOG.info("waiting for manager to resubmit the orphan task");
        this.waitForCounter(SplitLogCounters.tot_mgr_resubmit, 0L, 1L, (long)(this.to + this.to / 2));
        Assert.assertTrue((boolean)task.isUnassigned());
        this.waitForCounter(SplitLogCounters.tot_mgr_rescan, 0L, 1L, (long)(this.to + this.to / 2));
    }

    @Test(timeout=180000L)
    public void testUnassignedOrphan() throws Exception {
        LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at startup");
        String tasknode = ZKSplitLog.getEncodedNodeName((ZKWatcher)this.zkw, (String)"orphan/test/slash");
        SplitLogTask.Unassigned slt = new SplitLogTask.Unassigned(this.master.getServerName());
        this.zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        int version = ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode);
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        this.waitForCounter(SplitLogCounters.tot_mgr_orphan_task_acquired, 0L, 1L, (long)(this.to / 2));
        SplitLogManager.Task task = this.findOrCreateOrphanTask(tasknode);
        Assert.assertTrue((boolean)task.isOrphan());
        Assert.assertTrue((boolean)task.isUnassigned());
        this.waitForCounter(SplitLogCounters.tot_mgr_rescan, 0L, 1L, (long)(this.to / 2));
        SplitLogManager.Task task2 = this.findOrCreateOrphanTask(tasknode);
        Assert.assertTrue((task == task2 ? 1 : 0) != 0);
        LOG.debug("task = " + task);
        Assert.assertEquals((long)1L, (long)SplitLogCounters.tot_mgr_resubmit.sum());
        Assert.assertEquals((long)1L, (long)task.incarnation.get());
        Assert.assertEquals((long)0L, (long)task.unforcedResubmits.get());
        Assert.assertTrue((boolean)task.isOrphan());
        Assert.assertTrue((boolean)task.isUnassigned());
        Assert.assertTrue((ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode) > version ? 1 : 0) != 0);
    }

    @Test(timeout=180000L)
    public void testMultipleResubmits() throws Exception {
        LOG.info("TestMultipleResbmits - no indefinite resubmissions");
        this.conf.setInt("hbase.splitlog.max.resubmit", 2);
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        SplitLogManager.TaskBatch batch = new SplitLogManager.TaskBatch();
        String tasknode = this.submitTaskAndWait(batch, "foo/1");
        int version = ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode);
        ServerName worker1 = ServerName.valueOf((String)"worker1,1,1");
        ServerName worker2 = ServerName.valueOf((String)"worker2,1,1");
        ServerName worker3 = ServerName.valueOf((String)"worker3,1,1");
        SplitLogTask.Owned slt = new SplitLogTask.Owned(worker1);
        ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode, (byte[])slt.toByteArray());
        this.waitForCounter(SplitLogCounters.tot_mgr_heartbeat, 0L, 1L, (long)(this.to / 2));
        this.waitForCounter(SplitLogCounters.tot_mgr_resubmit, 0L, 1L, (long)(this.to + this.to / 2));
        int version1 = ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode);
        Assert.assertTrue((version1 > version ? 1 : 0) != 0);
        slt = new SplitLogTask.Owned(worker2);
        ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode, (byte[])slt.toByteArray());
        this.waitForCounter(SplitLogCounters.tot_mgr_heartbeat, 1L, 2L, (long)(this.to / 2));
        this.waitForCounter(SplitLogCounters.tot_mgr_resubmit, 1L, 2L, (long)(this.to + this.to / 2));
        int version2 = ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode);
        Assert.assertTrue((version2 > version1 ? 1 : 0) != 0);
        slt = new SplitLogTask.Owned(worker3);
        ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode, (byte[])slt.toByteArray());
        this.waitForCounter(SplitLogCounters.tot_mgr_heartbeat, 2L, 3L, (long)(this.to / 2));
        this.waitForCounter(SplitLogCounters.tot_mgr_resubmit_threshold_reached, 0L, 1L, (long)(this.to + this.to / 2));
        Thread.sleep(this.to + this.to / 2);
        Assert.assertEquals((long)2L, (long)(SplitLogCounters.tot_mgr_resubmit.sum() - SplitLogCounters.tot_mgr_resubmit_force.sum()));
    }

    @Test(timeout=180000L)
    public void testRescanCleanup() throws Exception {
        LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        SplitLogManager.TaskBatch batch = new SplitLogManager.TaskBatch();
        String tasknode = this.submitTaskAndWait(batch, "foo/1");
        int version = ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode);
        ServerName worker1 = ServerName.valueOf((String)"worker1,1,1");
        SplitLogTask.Owned slt = new SplitLogTask.Owned(worker1);
        ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode, (byte[])slt.toByteArray());
        this.waitForCounter(SplitLogCounters.tot_mgr_heartbeat, 0L, 1L, (long)(this.to / 2));
        this.waitForCounter(new Expr(){

            @Override
            public long eval() {
                return SplitLogCounters.tot_mgr_resubmit.sum() + SplitLogCounters.tot_mgr_resubmit_failed.sum();
            }
        }, 0L, 1L, 300000L);
        Assert.assertEquals((String)"Could not run test. Lost ZK connection?", (long)0L, (long)SplitLogCounters.tot_mgr_resubmit_failed.sum());
        int version1 = ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode);
        Assert.assertTrue((version1 > version ? 1 : 0) != 0);
        byte[] taskstate = ZKUtil.getData((ZKWatcher)this.zkw, (String)tasknode);
        slt = SplitLogTask.parseFrom((byte[])taskstate);
        Assert.assertTrue((boolean)slt.isUnassigned(this.master.getServerName()));
        this.waitForCounter(SplitLogCounters.tot_mgr_rescan_deleted, 0L, 1L, (long)(this.to / 2));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=180000L)
    public void testTaskDone() throws Exception {
        LOG.info("TestTaskDone - cleanup task node once in DONE state");
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        SplitLogManager.TaskBatch batch = new SplitLogManager.TaskBatch();
        String tasknode = this.submitTaskAndWait(batch, "foo/1");
        ServerName worker1 = ServerName.valueOf((String)"worker1,1,1");
        SplitLogTask.Done slt = new SplitLogTask.Done(worker1);
        ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode, (byte[])slt.toByteArray());
        SplitLogManager.TaskBatch taskBatch = batch;
        synchronized (taskBatch) {
            while (batch.installed != batch.done) {
                batch.wait();
            }
        }
        this.waitForCounter(SplitLogCounters.tot_mgr_task_deleted, 0L, 1L, (long)(this.to / 2));
        Assert.assertTrue((ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode) == -1 ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=180000L)
    public void testTaskErr() throws Exception {
        LOG.info("TestTaskErr - cleanup task node once in ERR state");
        this.conf.setInt("hbase.splitlog.max.resubmit", 0);
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        SplitLogManager.TaskBatch batch = new SplitLogManager.TaskBatch();
        String tasknode = this.submitTaskAndWait(batch, "foo/1");
        ServerName worker1 = ServerName.valueOf((String)"worker1,1,1");
        SplitLogTask.Err slt = new SplitLogTask.Err(worker1);
        ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode, (byte[])slt.toByteArray());
        SplitLogManager.TaskBatch taskBatch = batch;
        synchronized (taskBatch) {
            while (batch.installed != batch.error) {
                batch.wait();
            }
        }
        this.waitForCounter(SplitLogCounters.tot_mgr_task_deleted, 0L, 1L, (long)(this.to / 2));
        Assert.assertTrue((ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode) == -1 ? 1 : 0) != 0);
        this.conf.setInt("hbase.splitlog.max.resubmit", 3);
    }

    @Test(timeout=180000L)
    public void testTaskResigned() throws Exception {
        LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
        Assert.assertEquals((long)SplitLogCounters.tot_mgr_resubmit.sum(), (long)0L);
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        Assert.assertEquals((long)SplitLogCounters.tot_mgr_resubmit.sum(), (long)0L);
        SplitLogManager.TaskBatch batch = new SplitLogManager.TaskBatch();
        String tasknode = this.submitTaskAndWait(batch, "foo/1");
        Assert.assertEquals((long)SplitLogCounters.tot_mgr_resubmit.sum(), (long)0L);
        ServerName worker1 = ServerName.valueOf((String)"worker1,1,1");
        Assert.assertEquals((long)SplitLogCounters.tot_mgr_resubmit.sum(), (long)0L);
        SplitLogTask.Resigned slt = new SplitLogTask.Resigned(worker1);
        Assert.assertEquals((long)SplitLogCounters.tot_mgr_resubmit.sum(), (long)0L);
        ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode, (byte[])slt.toByteArray());
        ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode);
        if (SplitLogCounters.tot_mgr_resubmit.sum() == 0L) {
            this.waitForCounter(SplitLogCounters.tot_mgr_resubmit, 0L, 1L, (long)(this.to / 2));
        }
        Assert.assertEquals((long)SplitLogCounters.tot_mgr_resubmit.sum(), (long)1L);
        byte[] taskstate = ZKUtil.getData((ZKWatcher)this.zkw, (String)tasknode);
        slt = SplitLogTask.parseFrom((byte[])taskstate);
        Assert.assertTrue((boolean)slt.isUnassigned(this.master.getServerName()));
    }

    @Test(timeout=180000L)
    public void testUnassignedTimeout() throws Exception {
        LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then resubmit");
        String tasknode1 = ZKSplitLog.getEncodedNodeName((ZKWatcher)this.zkw, (String)"orphan/1");
        ServerName worker1 = ServerName.valueOf((String)"worker1,1,1");
        SplitLogTask.Owned slt = new SplitLogTask.Owned(worker1);
        this.zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        this.waitForCounter(SplitLogCounters.tot_mgr_orphan_task_acquired, 0L, 1L, (long)(this.to / 2));
        SplitLogManager.TaskBatch batch = new SplitLogManager.TaskBatch();
        this.submitTaskAndWait(batch, "foo/1");
        for (int i = 0; i < 3 * this.to / 100; ++i) {
            Thread.sleep(100L);
            ServerName worker2 = ServerName.valueOf((String)"worker1,1,1");
            slt = new SplitLogTask.Owned(worker2);
            ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode1, (byte[])slt.toByteArray());
        }
        LOG.info("waiting for manager to resubmit the orphan task");
        this.waitForCounter(SplitLogCounters.tot_mgr_resubmit, 0L, 1L, (long)(this.to + this.to / 2));
        this.waitForCounter(SplitLogCounters.tot_mgr_resubmit_unassigned, 0L, 1L, (long)(2 * this.to + this.to / 2));
    }

    @Test(timeout=180000L)
    public void testDeadWorker() throws Exception {
        int version1;
        LOG.info("testDeadWorker");
        this.conf.setLong("hbase.splitlog.max.resubmit", 0L);
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        SplitLogManager.TaskBatch batch = new SplitLogManager.TaskBatch();
        String tasknode = this.submitTaskAndWait(batch, "foo/1");
        int version = ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode);
        ServerName worker1 = ServerName.valueOf((String)"worker1,1,1");
        SplitLogTask.Owned slt = new SplitLogTask.Owned(worker1);
        ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode, (byte[])slt.toByteArray());
        if (SplitLogCounters.tot_mgr_heartbeat.sum() == 0L) {
            this.waitForCounter(SplitLogCounters.tot_mgr_heartbeat, 0L, 1L, (long)(this.to / 2));
        }
        this.slm.handleDeadWorker(worker1);
        if (SplitLogCounters.tot_mgr_resubmit.sum() == 0L) {
            this.waitForCounter(SplitLogCounters.tot_mgr_resubmit, 0L, 1L, (long)(this.to + this.to / 2));
        }
        if (SplitLogCounters.tot_mgr_resubmit_dead_server_task.sum() == 0L) {
            this.waitForCounter(SplitLogCounters.tot_mgr_resubmit_dead_server_task, 0L, 1L, (long)(this.to + this.to / 2));
        }
        Assert.assertTrue(((version1 = ZKUtil.checkExists((ZKWatcher)this.zkw, (String)tasknode)) > version ? 1 : 0) != 0);
        byte[] taskstate = ZKUtil.getData((ZKWatcher)this.zkw, (String)tasknode);
        slt = SplitLogTask.parseFrom((byte[])taskstate);
        Assert.assertTrue((boolean)slt.isUnassigned(this.master.getServerName()));
    }

    @Test(timeout=180000L)
    public void testWorkerCrash() throws Exception {
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        SplitLogManager.TaskBatch batch = new SplitLogManager.TaskBatch();
        String tasknode = this.submitTaskAndWait(batch, "foo/1");
        ServerName worker1 = ServerName.valueOf((String)"worker1,1,1");
        SplitLogTask.Owned slt = new SplitLogTask.Owned(worker1);
        ZKUtil.setData((ZKWatcher)this.zkw, (String)tasknode, (byte[])slt.toByteArray());
        if (SplitLogCounters.tot_mgr_heartbeat.sum() == 0L) {
            this.waitForCounter(SplitLogCounters.tot_mgr_heartbeat, 0L, 1L, (long)(this.to / 2));
        }
        Assert.assertEquals((long)0L, (long)SplitLogCounters.tot_mgr_resubmit.sum());
        Mockito.when((Object)this.sm.isServerOnline(worker1)).thenReturn((Object)false);
        Thread.sleep(1300L);
        Assert.assertEquals((long)1L, (long)SplitLogCounters.tot_mgr_resubmit.sum());
    }

    @Test(timeout=180000L)
    public void testEmptyLogDir() throws Exception {
        LOG.info("testEmptyLogDir");
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        FileSystem fs = TEST_UTIL.getTestFileSystem();
        Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(), "WALs"), ServerName.valueOf((String)"emptyLogDir", (int)1, (long)1L).toString());
        fs.mkdirs(emptyLogDirPath);
        this.slm.splitLogDistributed(emptyLogDirPath);
        Assert.assertFalse((boolean)fs.exists(emptyLogDirPath));
    }

    @Test(timeout=60000L)
    public void testLogFilesAreArchived() throws Exception {
        LOG.info("testLogFilesAreArchived");
        this.slm = new SplitLogManager((MasterServices)this.master, this.conf);
        FileSystem fs = TEST_UTIL.getTestFileSystem();
        Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
        this.conf.set("hbase.rootdir", dir.toString());
        String serverName = ServerName.valueOf((String)"foo", (int)1, (long)1L).toString();
        Path logDirPath = new Path(new Path(dir, "WALs"), serverName);
        fs.mkdirs(logDirPath);
        String logFile = new Path(logDirPath, UUID.randomUUID().toString()).toString();
        fs.create(new Path(logDirPath, logFile)).close();
        new Thread(){

            @Override
            public void run() {
                boolean done = false;
                while (!done) {
                    for (Map.Entry entry : TestSplitLogManager.this.slm.getTasks().entrySet()) {
                        ServerName worker1 = ServerName.valueOf((String)"worker1,1,1");
                        SplitLogTask.Done slt = new SplitLogTask.Done(worker1);
                        boolean encounteredZKException = false;
                        try {
                            ZKUtil.setData((ZKWatcher)TestSplitLogManager.this.zkw, (String)((String)entry.getKey()), (byte[])slt.toByteArray());
                        }
                        catch (KeeperException e) {
                            LOG.warn(e.toString(), (Throwable)e);
                            encounteredZKException = true;
                        }
                        if (encounteredZKException) continue;
                        done = true;
                    }
                }
            }
        }.start();
        this.slm.splitLogDistributed(logDirPath);
        Assert.assertFalse((boolean)fs.exists(logDirPath));
    }

    private static interface Expr {
        public long eval();
    }

    class DummyMasterServices
    extends MockNoopMasterServices {
        private ZKWatcher zkw;
        private CoordinatedStateManager cm;

        public DummyMasterServices(ZKWatcher zkw, Configuration conf) {
            super(conf);
            this.zkw = zkw;
            this.cm = new ZkCoordinatedStateManager((Server)this);
        }

        @Override
        public ZKWatcher getZooKeeper() {
            return this.zkw;
        }

        @Override
        public CoordinatedStateManager getCoordinatedStateManager() {
            return this.cm;
        }

        @Override
        public ServerManager getServerManager() {
            return TestSplitLogManager.this.sm;
        }
    }
}

