/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.distributedlog.BKAsyncLogWriter;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.LogWriter;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.util.DLUtils;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestBKDistributedLogNamespace
extends TestDistributedLogBase {
    @Rule
    public TestName runtime = new TestName();
    static final Logger LOG = LoggerFactory.getLogger(TestBKDistributedLogNamespace.class);
    protected static DistributedLogConfiguration conf = new DistributedLogConfiguration().setLockTimeout(10L).setEnableLedgerAllocatorPool(true).setLedgerAllocatorPoolName("test");
    private ZooKeeperClient zooKeeperClient;

    @Override
    @Before
    public void setup() throws Exception {
        this.zooKeeperClient = TestZooKeeperClientBuilder.newBuilder().uri(this.createDLMURI("/")).build();
    }

    @Override
    @After
    public void teardown() throws Exception {
        this.zooKeeperClient.close();
    }

    @Test(timeout=60000L)
    public void testCreateLogPath0() throws Exception {
        this.createLogPathTest("/create/log/path/" + this.runtime.getMethodName());
    }

    @Test(timeout=60000L)
    public void testCreateLogPath1() throws Exception {
        this.createLogPathTest("create/log/path/" + this.runtime.getMethodName());
    }

    private void createLogPathTest(String logName) throws Exception {
        URI uri = this.createDLMURI("/" + this.runtime.getMethodName());
        this.ensureURICreated(this.zooKeeperClient.get(), uri);
        DistributedLogConfiguration newConf = new DistributedLogConfiguration();
        newConf.addConfiguration(conf);
        newConf.setCreateStreamIfNotExists(false);
        Namespace namespace = NamespaceBuilder.newBuilder().conf(newConf).uri(uri).build();
        DistributedLogManager dlm = namespace.openLog(logName);
        try {
            LogWriter writer = dlm.startLogSegmentNonPartitioned();
            writer.write(DLMTestUtil.getLogRecordInstance(1L));
            writer.commit();
            Assert.fail((String)"Should fail to write data if stream doesn't exist.");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        dlm.close();
    }

    @Test(timeout=60000L)
    public void testCreateIfNotExists() throws Exception {
        URI uri = this.createDLMURI("/" + this.runtime.getMethodName());
        this.ensureURICreated(this.zooKeeperClient.get(), uri);
        DistributedLogConfiguration newConf = new DistributedLogConfiguration();
        newConf.addConfiguration(conf);
        newConf.setCreateStreamIfNotExists(false);
        String streamName = "test-stream";
        Namespace namespace = NamespaceBuilder.newBuilder().conf(newConf).uri(uri).build();
        DistributedLogManager dlm = namespace.openLog(streamName);
        try {
            LogWriter writer = dlm.startLogSegmentNonPartitioned();
            writer.write(DLMTestUtil.getLogRecordInstance(1L));
            Assert.fail((String)"Should fail to write data if stream doesn't exist.");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        dlm.close();
        namespace.createLog(streamName);
        DistributedLogManager newDLM = namespace.openLog(streamName);
        LogWriter newWriter = newDLM.startLogSegmentNonPartitioned();
        newWriter.write(DLMTestUtil.getLogRecordInstance(1L));
        newWriter.close();
        newDLM.close();
    }

    @Test(timeout=60000L)
    public void testInvalidStreamName() throws Exception {
        String streamName;
        char[] chars2;
        Assert.assertFalse((boolean)DLUtils.isReservedStreamName("test"));
        Assert.assertTrue((boolean)DLUtils.isReservedStreamName(".test"));
        URI uri = this.createDLMURI("/" + this.runtime.getMethodName());
        Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
        try {
            namespace.openLog(".test1");
            Assert.fail((String)"Should fail to create invalid stream .test");
        }
        catch (InvalidStreamNameException invalidStreamNameException) {
            // empty catch block
        }
        DistributedLogManager dlm = namespace.openLog("test1");
        LogWriter writer = dlm.startLogSegmentNonPartitioned();
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.close();
        dlm.close();
        try {
            namespace.openLog(".test2");
            Assert.fail((String)"Should fail to create invalid stream .test2");
        }
        catch (InvalidStreamNameException invalidStreamNameException) {
            // empty catch block
        }
        try {
            namespace.openLog("/ test2");
            Assert.fail((String)"Should fail to create invalid stream / test2");
        }
        catch (InvalidStreamNameException invalidStreamNameException) {
            // empty catch block
        }
        try {
            chars2 = new char[6];
            for (int i = 0; i < chars2.length; ++i) {
                chars2[i] = 97;
            }
            chars2[0] = '\u0000';
            streamName = new String(chars2);
            namespace.openLog(streamName);
            Assert.fail((String)("Should fail to create invalid stream " + streamName));
        }
        catch (InvalidStreamNameException chars2) {
            // empty catch block
        }
        try {
            chars2 = new char[6];
            for (int i = 0; i < chars2.length; ++i) {
                chars2[i] = 97;
            }
            chars2[3] = 16;
            streamName = new String(chars2);
            namespace.openLog(streamName);
            Assert.fail((String)("Should fail to create invalid stream " + streamName));
        }
        catch (InvalidStreamNameException chars3) {
            // empty catch block
        }
        DistributedLogManager newDLM = namespace.openLog("test_2-3");
        LogWriter newWriter = newDLM.startLogSegmentNonPartitioned();
        newWriter.write(DLMTestUtil.getLogRecordInstance(1L));
        newWriter.close();
        newDLM.close();
        Iterator<String> streamIter = namespace.getLogs();
        HashSet<String> streamSet = Sets.newHashSet(streamIter);
        Assert.assertEquals((long)2L, (long)streamSet.size());
        Assert.assertTrue((boolean)streamSet.contains("test1"));
        Assert.assertTrue((boolean)streamSet.contains("test_2-3"));
        namespace.close();
    }

    @Test(timeout=60000L)
    public void testNamespaceListener() throws Exception {
        URI uri = this.createDLMURI("/" + this.runtime.getMethodName());
        this.zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
        final CountDownLatch[] latches = new CountDownLatch[3];
        for (int i = 0; i < 3; ++i) {
            latches[i] = new CountDownLatch(1);
        }
        final AtomicInteger numUpdates = new AtomicInteger(0);
        final AtomicInteger numFailures = new AtomicInteger(0);
        final AtomicReference<Object> receivedStreams = new AtomicReference<Object>(null);
        namespace.registerNamespaceListener(new NamespaceListener(){

            @Override
            public void onStreamsChanged(Iterator<String> streams) {
                HashSet<String> streamSet = Sets.newHashSet(streams);
                int updates = numUpdates.incrementAndGet();
                if (streamSet.size() != updates - 1) {
                    numFailures.incrementAndGet();
                }
                receivedStreams.set(streamSet);
                latches[updates - 1].countDown();
            }
        });
        latches[0].await();
        namespace.createLog("test1");
        latches[1].await();
        namespace.createLog("test2");
        latches[2].await();
        Assert.assertEquals((long)0L, (long)numFailures.get());
        Assert.assertNotNull(receivedStreams.get());
        HashSet streamSet = new HashSet();
        streamSet.addAll(receivedStreams.get());
        Assert.assertEquals((long)2L, (long)((Collection)receivedStreams.get()).size());
        Assert.assertEquals((long)2L, (long)streamSet.size());
        Assert.assertTrue((boolean)streamSet.contains("test1"));
        Assert.assertTrue((boolean)streamSet.contains("test2"));
    }

    private void initDlogMeta(String dlNamespace, String un, String streamName) throws Exception {
        URI uri = this.createDLMURI(dlNamespace);
        DistributedLogConfiguration newConf = new DistributedLogConfiguration();
        newConf.addConfiguration(conf);
        newConf.setCreateStreamIfNotExists(true);
        newConf.setZkAclId(un);
        Namespace namespace = NamespaceBuilder.newBuilder().conf(newConf).uri(uri).build();
        DistributedLogManager dlm = namespace.openLog(streamName);
        LogWriter writer = dlm.startLogSegmentNonPartitioned();
        for (int i = 0; i < 10; ++i) {
            writer.write(DLMTestUtil.getLogRecordInstance(1L));
        }
        writer.close();
        dlm.close();
        namespace.close();
    }

    @Test(timeout=60000L)
    public void testAclPermsZkAccessConflict() throws Exception {
        String namespace = "/" + this.runtime.getMethodName();
        this.initDlogMeta(namespace, "test-un", "test-stream");
        URI uri = this.createDLMURI(namespace);
        ZooKeeperClient zkc = TestZooKeeperClientBuilder.newBuilder().name("unpriv").uri(uri).build();
        try {
            zkc.get().create(uri.getPath() + "/test-stream/test-garbage", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.fail((String)"write should have failed due to perms");
        }
        catch (KeeperException.NoAuthException ex) {
            LOG.info("caught exception trying to write with no perms", (Throwable)ex);
        }
        try {
            zkc.get().setData(uri.getPath() + "/test-stream", new byte[0], 0);
            Assert.fail((String)"write should have failed due to perms");
        }
        catch (KeeperException.NoAuthException ex) {
            LOG.info("caught exception trying to write with no perms", (Throwable)ex);
        }
    }

    @Test(timeout=60000L)
    public void testAclPermsZkAccessNoConflict() throws Exception {
        String namespace = "/" + this.runtime.getMethodName();
        this.initDlogMeta(namespace, "test-un", "test-stream");
        URI uri = this.createDLMURI(namespace);
        ZooKeeperClient zkc = TestZooKeeperClientBuilder.newBuilder().name("unpriv").uri(uri).build();
        zkc.get().getChildren(uri.getPath() + "/test-stream", false, new Stat());
        zkc.get().getData(uri.getPath() + "/test-stream", false, new Stat());
    }

    @Test(timeout=60000L)
    public void testAclModifyPermsDlmConflict() throws Exception {
        String streamName = "test-stream";
        this.initDlogMeta("/" + this.runtime.getMethodName(), "test-un", streamName);
        try {
            this.initDlogMeta("/" + this.runtime.getMethodName(), "not-test-un", streamName);
            Assert.fail((String)"Write should have failed due to perms");
        }
        catch (ZKException ex) {
            LOG.info("Caught exception trying to write with no perms", (Throwable)ex);
            Assert.assertEquals((Object)KeeperException.Code.NOAUTH, (Object)ex.getKeeperExceptionCode());
        }
        catch (Exception ex) {
            LOG.info("Caught wrong exception trying to write with no perms", (Throwable)ex);
            Assert.fail((String)("Wrong exception " + ex.getClass().getName() + " expected " + LockingException.class.getName()));
        }
        this.initDlogMeta("/" + this.runtime.getMethodName(), "test-un", streamName);
    }

    @Test(timeout=60000L)
    public void testAclModifyPermsDlmNoConflict() throws Exception {
        String streamName = "test-stream";
        this.initDlogMeta("/" + this.runtime.getMethodName(), "test-un", streamName);
        this.initDlogMeta("/" + this.runtime.getMethodName(), "test-un", streamName);
    }

    static void validateBadAllocatorConfiguration(DistributedLogConfiguration conf, URI uri) throws Exception {
        try {
            BKNamespaceDriver.validateAndGetFullLedgerAllocatorPoolPath(conf, uri);
            Assert.fail((String)"Should throw exception when bad allocator configuration provided");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test(timeout=60000L)
    public void testValidateAndGetFullLedgerAllocatorPoolPath() throws Exception {
        DistributedLogConfiguration testConf = new DistributedLogConfiguration();
        testConf.setEnableLedgerAllocatorPool(true);
        String namespace = "/" + this.runtime.getMethodName();
        URI uri = this.createDLMURI(namespace);
        testConf.setLedgerAllocatorPoolName("test");
        testConf.setLedgerAllocatorPoolPath("test");
        TestBKDistributedLogNamespace.validateBadAllocatorConfiguration(testConf, uri);
        testConf.setLedgerAllocatorPoolPath(".");
        TestBKDistributedLogNamespace.validateBadAllocatorConfiguration(testConf, uri);
        testConf.setLedgerAllocatorPoolPath("..");
        TestBKDistributedLogNamespace.validateBadAllocatorConfiguration(testConf, uri);
        testConf.setLedgerAllocatorPoolPath("./");
        TestBKDistributedLogNamespace.validateBadAllocatorConfiguration(testConf, uri);
        testConf.setLedgerAllocatorPoolPath(".test/");
        TestBKDistributedLogNamespace.validateBadAllocatorConfiguration(testConf, uri);
        testConf.setLedgerAllocatorPoolPath(".test");
        testConf.setLedgerAllocatorPoolName(null);
        TestBKDistributedLogNamespace.validateBadAllocatorConfiguration(testConf, uri);
    }

    @Test(timeout=60000L)
    public void testUseNamespaceAfterCloseShouldFailFast() throws Exception {
        URI uri = this.createDLMURI("/" + this.runtime.getMethodName());
        Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
        String logName = "test-stream";
        namespace.createLog(logName);
        Assert.assertTrue((boolean)namespace.logExists(logName));
        DistributedLogManager dlm = namespace.openLog(logName);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        for (long i = 0L; i < 3L; ++i) {
            LogRecord record = DLMTestUtil.getLargeLogRecordInstance(i);
            writer.write(record);
        }
        writer.closeAndComplete();
        LogReader reader = dlm.getInputStream(0L);
        for (long i = 0L; i < 3L; ++i) {
            Assert.assertEquals((long)reader.readNext(false).getTransactionId(), (long)i);
        }
        namespace.deleteLog(logName);
        Assert.assertFalse((boolean)namespace.logExists(logName));
        namespace.close();
        try {
            namespace.createLog(logName);
            Assert.fail((String)"Should throw exception after namespace is closed");
        }
        catch (AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        try {
            namespace.openLog(logName);
            Assert.fail((String)"Should throw exception after namespace is closed");
        }
        catch (AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        try {
            namespace.logExists(logName);
            Assert.fail((String)"Should throw exception after namespace is closed");
        }
        catch (AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        try {
            namespace.getLogs();
            Assert.fail((String)"Should throw exception after namespace is closed");
        }
        catch (AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        try {
            namespace.deleteLog(logName);
            Assert.fail((String)"Should throw exception after namespace is closed");
        }
        catch (AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        try {
            namespace.createAccessControlManager();
            Assert.fail((String)"Should throw exception after namespace is closed");
        }
        catch (AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
    }
}

