/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.base.Supplier;
import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import org.slf4j.Logger;
import org.slf4j.event.Level;

@RunWith(value=Parameterized.class)
public class TestEditLogTailer {
    private static boolean useAsyncEditLog;
    private static final String DIR_PREFIX = "/dir";
    private static final int DIRS_TO_MAKE = 20;
    static final long SLEEP_TIME = 1000L;
    static final long NN_LAG_TIMEOUT = 10000L;

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        ArrayList<Object[]> params = new ArrayList<Object[]>();
        params.add(new Object[]{Boolean.FALSE});
        params.add(new Object[]{Boolean.TRUE});
        return params;
    }

    public TestEditLogTailer(Boolean async) {
        useAsyncEditLog = async;
    }

    private static Configuration getConf() {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.namenode.edits.asynclogging", useAsyncEditLog);
        return conf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTailer() throws IOException, InterruptedException, ServiceFailedException {
        Configuration conf = TestEditLogTailer.getConf();
        conf.setInt("dfs.ha.tail-edits.period", 0);
        conf.setInt("dfs.ha.tail-edits.namenode-retries", 100);
        conf.setLong("dfs.ha.tail-edits.max-txns-per-lock", 3L);
        HAUtil.setAllowStandbyReads((Configuration)conf, (boolean)true);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(0).build();
        cluster.waitActive();
        cluster.transitionToActive(0);
        NameNode nn1 = cluster.getNameNode(0);
        NameNode nn2 = cluster.getNameNode(1);
        try {
            int i;
            for (i = 0; i < 10; ++i) {
                NameNodeAdapter.mkdirs(nn1, TestEditLogTailer.getDirPath(i), new PermissionStatus("test", "test", new FsPermission(493)), true);
            }
            HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
            Assert.assertEquals((String)"Inconsistent number of applied txns on Standby", (long)nn1.getNamesystem().getEditLog().getLastWrittenTxId(), (long)(nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1L));
            for (i = 0; i < 10; ++i) {
                Assert.assertTrue((boolean)NameNodeAdapter.getFileInfo(nn2, TestEditLogTailer.getDirPath(i), false, false, false).isDirectory());
            }
            for (i = 10; i < 20; ++i) {
                NameNodeAdapter.mkdirs(nn1, TestEditLogTailer.getDirPath(i), new PermissionStatus("test", "test", new FsPermission(493)), true);
            }
            HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
            Assert.assertEquals((String)"Inconsistent number of applied txns on Standby", (long)nn1.getNamesystem().getEditLog().getLastWrittenTxId(), (long)(nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1L));
            for (i = 10; i < 20; ++i) {
                Assert.assertTrue((boolean)NameNodeAdapter.getFileInfo(nn2, TestEditLogTailer.getDirPath(i), false, false, false).isDirectory());
            }
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTailerBackoff() throws Exception {
        Configuration conf = new Configuration();
        NameNode.initMetrics((Configuration)conf, (HdfsServerConstants.NamenodeRole)HdfsServerConstants.NamenodeRole.NAMENODE);
        conf.setTimeDuration("dfs.ha.tail-edits.period", 1L, TimeUnit.MILLISECONDS);
        conf.setTimeDuration("dfs.ha.tail-edits.period.backoff-max", 10L, TimeUnit.MILLISECONDS);
        FSNamesystem mockNamesystem = (FSNamesystem)Mockito.mock(FSNamesystem.class);
        FSImage mockImage = (FSImage)Mockito.mock(FSImage.class);
        NNStorage mockStorage = (NNStorage)Mockito.mock(NNStorage.class);
        Mockito.when((Object)mockNamesystem.getFSImage()).thenReturn((Object)mockImage);
        Mockito.when((Object)mockImage.getStorage()).thenReturn((Object)mockStorage);
        final ConcurrentLinkedQueue sleepDurations = new ConcurrentLinkedQueue();
        int zeroEditCount = 5;
        final AtomicInteger tailEditsCallCount = new AtomicInteger(0);
        EditLogTailer tailer = new EditLogTailer(mockNamesystem, conf){

            void sleep(long sleepTimeMs) {
                if (sleepDurations.size() <= 5) {
                    sleepDurations.add(sleepTimeMs);
                }
            }

            public long doTailEdits() {
                return tailEditsCallCount.getAndIncrement() < 5 ? 0L : 1L;
            }
        };
        tailer.start();
        try {
            GenericTestUtils.waitFor(() -> sleepDurations.size() > 5, (long)50L, (long)10000L);
        }
        finally {
            tailer.stop();
        }
        List<Long> expectedDurations = Arrays.asList(2L, 4L, 8L, 10L, 10L, 1L);
        Assert.assertEquals(expectedDurations, new ArrayList(sleepDurations));
    }

    @Test
    public void testNN0TriggersLogRolls() throws Exception {
        TestEditLogTailer.testStandbyTriggersLogRolls(0);
    }

    @Test
    public void testNN1TriggersLogRolls() throws Exception {
        TestEditLogTailer.testStandbyTriggersLogRolls(1);
    }

    @Test
    public void testNN2TriggersLogRolls() throws Exception {
        TestEditLogTailer.testStandbyTriggersLogRolls(2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void testStandbyTriggersLogRolls(int activeIndex) throws Exception {
        Configuration conf = TestEditLogTailer.getConf();
        conf.setInt("dfs.ha.log-roll.period", 1);
        conf.setInt("dfs.ha.tail-edits.period", 1);
        conf.setInt("dfs.ha.tail-edits.namenode-retries", 100);
        MiniDFSCluster cluster = null;
        for (int i = 0; i < 5; ++i) {
            try {
                cluster = TestEditLogTailer.createMiniDFSCluster(conf, 3);
                break;
            }
            catch (BindException e) {
                continue;
            }
        }
        if (cluster == null) {
            Assert.fail((String)"failed to start mini cluster.");
        }
        try {
            cluster.transitionToActive(activeIndex);
            TestEditLogTailer.waitForLogRollInSharedDir(cluster, 3L);
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggersLogRollsForAllStandbyNN() throws Exception {
        Configuration conf = TestEditLogTailer.getConf();
        conf.setInt("dfs.ha.log-roll.period", 1);
        conf.setInt("dfs.ha.tail-edits.period", 1);
        conf.setInt("dfs.ha.tail-edits.namenode-retries", 100);
        MiniDFSCluster cluster = null;
        try {
            cluster = TestEditLogTailer.createMiniDFSCluster(conf, 3);
            cluster.transitionToStandby(0);
            cluster.transitionToStandby(1);
            cluster.transitionToStandby(2);
            try {
                TestEditLogTailer.waitForLogRollInSharedDir(cluster, 3L);
                Assert.fail((String)"After all NN become Standby state, Standby NN should roll log, but it will be failed");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
            cluster.transitionToActive(0);
            TestEditLogTailer.waitForLogRollInSharedDir(cluster, 3L);
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    private static String getDirPath(int suffix) {
        return DIR_PREFIX + suffix;
    }

    private static void waitForLogRollInSharedDir(MiniDFSCluster cluster, long startTxId) throws Exception {
        URI sharedUri = cluster.getSharedEditsDir(0, 2);
        File sharedDir = new File(sharedUri.getPath(), "current");
        final File expectedInProgressLog = new File(sharedDir, NNStorage.getInProgressEditsFileName((long)startTxId));
        final File expectedFinalizedLog = new File(sharedDir, NNStorage.getFinalizedEditsFileName((long)startTxId, (long)(startTxId + 1L)));
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                return expectedInProgressLog.exists() || expectedFinalizedLog.exists();
            }
        }, (long)100L, (long)10000L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=20000L)
    public void testRollEditTimeoutForActiveNN() throws IOException {
        Configuration conf = TestEditLogTailer.getConf();
        conf.setInt("dfs.ha.tail-edits.rolledits.timeout", 5);
        conf.setInt("dfs.ha.tail-edits.period", 1);
        conf.setInt("dfs.ha.tail-edits.namenode-retries", 100);
        HAUtil.setAllowStandbyReads((Configuration)conf, (boolean)true);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(0).build();
        cluster.waitActive();
        cluster.transitionToActive(0);
        try {
            EditLogTailer tailer = (EditLogTailer)Mockito.spy((Object)cluster.getNamesystem(1).getEditLogTailer());
            final AtomicInteger flag = new AtomicInteger(0);
            Mockito.when((Object)tailer.getNameNodeProxy()).thenReturn((Object)new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    Thread.sleep(30000L);
                    Assert.assertTrue((boolean)Thread.currentThread().isInterrupted());
                    flag.addAndGet(1);
                    return null;
                }
            });
            tailer.triggerActiveLogRoll();
            Assert.assertEquals((long)0L, (long)flag.get());
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRollEditLogIOExceptionForRemoteNN() throws IOException {
        Configuration conf = TestEditLogTailer.getConf();
        conf.setInt("dfs.ha.log-roll.period", 1);
        conf.setInt("dfs.ha.tail-edits.period", 1);
        MiniDFSCluster cluster = null;
        try {
            cluster = TestEditLogTailer.createMiniDFSCluster(conf, 3);
            cluster.transitionToActive(0);
            EditLogTailer tailer = (EditLogTailer)Mockito.spy((Object)cluster.getNamesystem(1).getEditLogTailer());
            final AtomicInteger invokedTimes = new AtomicInteger(0);
            OngoingStubbing ongoingStubbing = Mockito.when((Object)tailer.getNameNodeProxy());
            EditLogTailer editLogTailer = tailer;
            editLogTailer.getClass();
            ongoingStubbing.thenReturn((Object)new EditLogTailer.MultipleNameNodeProxy<Void>(editLogTailer){

                protected Void doWork() throws IOException {
                    invokedTimes.getAndIncrement();
                    throw new IOException("It is an IO Exception.");
                }
            });
            tailer.triggerActiveLogRoll();
            Assert.assertEquals((long)6L, (long)invokedTimes.get());
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStandbyTriggersLogRollsWhenTailInProgressEdits() throws Exception {
        int standbyCatchupWaitTime = 2;
        int noLogRollWaitTime = 2;
        int logRollWaitTime = 3;
        Configuration conf = TestEditLogTailer.getConf();
        conf.setInt("dfs.ha.log-roll.period", 5);
        conf.setInt("dfs.ha.tail-edits.period", 1);
        conf.setBoolean("dfs.ha.tail-edits.in-progress", true);
        MiniDFSCluster cluster = TestEditLogTailer.createMiniDFSCluster(conf, 2);
        if (cluster == null) {
            Assert.fail((String)"failed to start mini cluster.");
        }
        try {
            int activeIndex = new Random().nextBoolean() ? 1 : 0;
            int standbyIndex = activeIndex == 0 ? 1 : 0;
            cluster.transitionToActive(activeIndex);
            NameNode active = cluster.getNameNode(activeIndex);
            NameNode standby = cluster.getNameNode(standbyIndex);
            long origTxId = active.getNamesystem().getFSImage().getEditLog().getCurSegmentTxId();
            for (int i = 0; i < 10; ++i) {
                NameNodeAdapter.mkdirs(active, TestEditLogTailer.getDirPath(i), new PermissionStatus("test", "test", new FsPermission(493)), true);
            }
            long activeTxId = active.getNamesystem().getFSImage().getEditLog().getLastWrittenTxId();
            TestEditLogTailer.waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId, 2);
            for (int i = 10; i < 20; ++i) {
                NameNodeAdapter.mkdirs(active, TestEditLogTailer.getDirPath(i), new PermissionStatus("test", "test", new FsPermission(493)), true);
            }
            boolean exceptionThrown = false;
            try {
                TestEditLogTailer.checkForLogRoll(active, origTxId, 2);
            }
            catch (TimeoutException e) {
                exceptionThrown = true;
            }
            Assert.assertTrue((boolean)exceptionThrown);
            TestEditLogTailer.checkForLogRoll(active, origTxId, 3);
        }
        finally {
            cluster.shutdown();
        }
    }

    private static void waitForStandbyToCatchUpWithInProgressEdits(final NameNode standby, final long activeTxId, int maxWaitSec) throws Exception {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                long standbyTxId = standby.getNamesystem().getFSImage().getLastAppliedTxId();
                return standbyTxId >= activeTxId;
            }
        }, (long)100L, (long)(maxWaitSec * 1000));
    }

    private static void checkForLogRoll(final NameNode active, final long origTxId, int maxWaitSec) throws Exception {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog().getCurSegmentTxId();
                return origTxId != curSegmentTxId;
            }
        }, (long)100L, (long)(maxWaitSec * 1000));
    }

    private static MiniDFSCluster createMiniDFSCluster(Configuration conf, int nnCount) throws IOException {
        int basePort = 10060 + new Random().nextInt(1000) * 2;
        MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology(nnCount, basePort);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology).numDataNodes(0).build();
        return cluster;
    }

    static {
        GenericTestUtils.setLogLevel((Logger)FSEditLog.LOG, (Level)Level.DEBUG);
        GenericTestUtils.setLogLevel((Logger)FSImage.LOG, (Level)Level.DEBUG);
        GenericTestUtils.setLogLevel((Logger)FSEditLog.LOG, (Level)Level.DEBUG);
        GenericTestUtils.setLogLevel((Logger)EditLogTailer.LOG, (Level)Level.DEBUG);
    }
}

