/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.meta;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.TextFormat;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.util.SubTreeCache;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkLedgerUnderreplicationManager
implements LedgerUnderreplicationManager {
    static final Logger LOG = LoggerFactory.getLogger(ZkLedgerUnderreplicationManager.class);
    static final String LAYOUT = "BASIC";
    static final int LAYOUT_VERSION = 1;
    private static final byte[] LOCK_DATA = ZkLedgerUnderreplicationManager.getLockData();
    private final Map<Long, Lock> heldLocks = new ConcurrentHashMap<Long, Lock>();
    private final Pattern idExtractionPattern;
    private final String basePath;
    private final String urLedgerPath;
    private final String urLockPath;
    private final String layoutZNode;
    private final AbstractConfiguration conf;
    private final String lostBookieRecoveryDelayZnode;
    private final ZooKeeper zkc;
    private final SubTreeCache subTreeCache;

    public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, final ZooKeeper zkc) throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
        this.conf = conf;
        this.basePath = ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath());
        this.layoutZNode = this.basePath + '/' + "LAYOUT";
        this.urLedgerPath = this.basePath + "/ledgers";
        this.urLockPath = this.basePath + '/' + "locks";
        this.lostBookieRecoveryDelayZnode = this.basePath + '/' + "lostBookieRecoveryDelay";
        this.idExtractionPattern = Pattern.compile("urL(\\d+)$");
        this.zkc = zkc;
        this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider(){

            @Override
            public List<String> getChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {
                return zkc.getChildren(path, watcher);
            }
        });
        this.checkLayout();
    }

    public static String getBasePath(String rootPath) {
        return String.format("%s/%s", rootPath, "underreplication");
    }

    public static String getUrLockPath(String rootPath) {
        return String.format("%s/%s", ZkLedgerUnderreplicationManager.getBasePath(rootPath), "locks");
    }

    public static byte[] getLockData() {
        DataFormats.LockDataFormat.Builder lockDataBuilder = DataFormats.LockDataFormat.newBuilder();
        try {
            lockDataBuilder.setBookieId(DNS.getDefaultHost("default"));
        }
        catch (UnknownHostException unknownHostException) {
            // empty catch block
        }
        return TextFormat.printToString((MessageOrBuilder)lockDataBuilder.build()).getBytes(Charsets.UTF_8);
    }

    private void checkLayout() throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
        List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
        if (this.zkc.exists(this.basePath, false) == null) {
            try {
                this.zkc.create(this.basePath, new byte[0], zkAcls, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
        while (this.zkc.exists(this.layoutZNode, false) == null) {
            DataFormats.LedgerRereplicationLayoutFormat.Builder builder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
            builder.setType(LAYOUT).setVersion(1);
            try {
                this.zkc.create(this.layoutZNode, TextFormat.printToString((MessageOrBuilder)builder.build()).getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nne) {}
        }
        byte[] layoutData = this.zkc.getData(this.layoutZNode, false, null);
        DataFormats.LedgerRereplicationLayoutFormat.Builder builder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
        try {
            TextFormat.merge((CharSequence)new String(layoutData, Charsets.UTF_8), (Message.Builder)builder);
            DataFormats.LedgerRereplicationLayoutFormat layout = builder.build();
            if (!layout.getType().equals(LAYOUT) || layout.getVersion() != 1) {
                throw new ReplicationException.CompatibilityException("Incompatible layout found (BASIC:1)");
            }
        }
        catch (TextFormat.ParseException pe) {
            throw new ReplicationException.CompatibilityException("Invalid data found", pe);
        }
        if (this.zkc.exists(this.urLedgerPath, false) == null) {
            try {
                this.zkc.create(this.urLedgerPath, new byte[0], zkAcls, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
        if (this.zkc.exists(this.urLockPath, false) == null) {
            try {
                this.zkc.create(this.urLockPath, new byte[0], zkAcls, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
    }

    private long getLedgerId(String path) throws NumberFormatException {
        Matcher m = this.idExtractionPattern.matcher(path);
        if (m.find()) {
            return Long.parseLong(m.group(1));
        }
        throw new NumberFormatException("Couldn't find ledgerid in path");
    }

    public static String getParentZnodePath(String base, long ledgerId) {
        String subdir1 = String.format("%04x", ledgerId >> 48 & 0xFFFFL);
        String subdir2 = String.format("%04x", ledgerId >> 32 & 0xFFFFL);
        String subdir3 = String.format("%04x", ledgerId >> 16 & 0xFFFFL);
        String subdir4 = String.format("%04x", ledgerId & 0xFFFFL);
        return String.format("%s/%s/%s/%s/%s", base, subdir1, subdir2, subdir3, subdir4);
    }

    public static String getUrLedgerZnode(String base, long ledgerId) {
        return String.format("%s/urL%010d", ZkLedgerUnderreplicationManager.getParentZnodePath(base, ledgerId), ledgerId);
    }

    public static String getUrLedgerLockZnode(String base, long ledgerId) {
        return String.format("%s/urL%010d", base, ledgerId);
    }

    private String getUrLedgerZnode(long ledgerId) {
        return ZkLedgerUnderreplicationManager.getUrLedgerZnode(this.urLedgerPath, ledgerId);
    }

    @VisibleForTesting
    public DataFormats.UnderreplicatedLedgerFormat getLedgerUnreplicationInfo(long ledgerId) throws KeeperException, TextFormat.ParseException, InterruptedException {
        String znode = this.getUrLedgerZnode(ledgerId);
        DataFormats.UnderreplicatedLedgerFormat.Builder builder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
        byte[] data = this.zkc.getData(znode, false, null);
        TextFormat.merge((CharSequence)new String(data, Charsets.UTF_8), (Message.Builder)builder);
        return builder.build();
    }

    @Override
    public void markLedgerUnderreplicated(long ledgerId, String missingReplica) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", (Object)ledgerId, (Object)missingReplica);
        }
        try {
            List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
            String znode = this.getUrLedgerZnode(ledgerId);
            while (true) {
                DataFormats.UnderreplicatedLedgerFormat.Builder builder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
                try {
                    builder.addReplica(missingReplica);
                    ZkUtils.createFullPathOptimistic(this.zkc, znode, TextFormat.printToString((MessageOrBuilder)builder.build()).getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
                }
                catch (KeeperException.NodeExistsException nee) {
                    Stat s = this.zkc.exists(znode, false);
                    if (s == null) continue;
                    try {
                        byte[] bytes = this.zkc.getData(znode, false, s);
                        builder.clear();
                        TextFormat.merge((CharSequence)new String(bytes, Charsets.UTF_8), (Message.Builder)builder);
                        DataFormats.UnderreplicatedLedgerFormat data = builder.build();
                        if (data.getReplicaList().contains((Object)missingReplica)) {
                            return;
                        }
                        builder.addReplica(missingReplica);
                        this.zkc.setData(znode, TextFormat.printToString((MessageOrBuilder)builder.build()).getBytes(Charsets.UTF_8), s.getVersion());
                    }
                    catch (KeeperException.NoNodeException nne) {
                        continue;
                    }
                    catch (KeeperException.BadVersionException bve) {
                        continue;
                    }
                    catch (TextFormat.ParseException pe) {
                        throw new ReplicationException.UnavailableException("Invalid data found", pe);
                    }
                }
                break;
            }
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public void markLedgerReplicated(long ledgerId) throws ReplicationException.UnavailableException {
        block12: {
            if (LOG.isDebugEnabled()) {
                LOG.debug("markLedgerReplicated(ledgerId={})", (Object)ledgerId);
            }
            try {
                Lock l = this.heldLocks.get(ledgerId);
                if (l == null) break block12;
                this.zkc.delete(this.getUrLedgerZnode(ledgerId), l.getLedgerZNodeVersion());
                try {
                    String[] parts = this.getUrLedgerZnode(ledgerId).split("/");
                    for (int i = 1; i <= 4; ++i) {
                        Object[] p = Arrays.copyOf(parts, parts.length - i);
                        String path = Joiner.on((String)"/").join(p);
                        Stat s = this.zkc.exists(path, null);
                        if (s == null) continue;
                        this.zkc.delete(path, s.getVersion());
                    }
                }
                catch (KeeperException.NotEmptyException notEmptyException) {
                    // empty catch block
                }
            }
            catch (KeeperException.NoNodeException l) {
            }
            catch (KeeperException.BadVersionException l) {
            }
            catch (KeeperException ke) {
                LOG.error("Error deleting underreplicated ledger znode", (Throwable)ke);
                throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
            }
            finally {
                this.releaseUnderreplicatedLedger(ledgerId);
            }
        }
    }

    @Override
    public Iterator<Long> listLedgersToRereplicate(final Predicate<List<String>> predicate) {
        final LinkedList<String> queue = new LinkedList<String>();
        queue.add(this.urLedgerPath);
        return new Iterator<Long>(){
            final Queue<Long> curBatch = new LinkedList<Long>();

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }

            @Override
            public boolean hasNext() {
                if (this.curBatch.size() > 0) {
                    return true;
                }
                while (queue.size() > 0 && this.curBatch.size() == 0) {
                    String parent = (String)queue.remove();
                    try {
                        for (String c : ZkLedgerUnderreplicationManager.this.zkc.getChildren(parent, false)) {
                            try {
                                String child = parent + "/" + c;
                                if (c.startsWith("urL")) {
                                    long ledgerId = ZkLedgerUnderreplicationManager.this.getLedgerId(child);
                                    if (predicate != null && !predicate.test(ZkLedgerUnderreplicationManager.this.getLedgerUnreplicationInfo(ledgerId).getReplicaList())) continue;
                                    this.curBatch.add(ledgerId);
                                    continue;
                                }
                                queue.add(child);
                            }
                            catch (KeeperException.NoNodeException noNodeException) {}
                        }
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        return false;
                    }
                    catch (KeeperException.NoNodeException ie) {
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading list", e);
                    }
                }
                return this.curBatch.size() > 0;
            }

            @Override
            public Long next() {
                assert (this.curBatch.size() > 0);
                return this.curBatch.remove();
            }
        };
    }

    private long getLedgerToRereplicateFromHierarchy(String parent, long depth) throws KeeperException, InterruptedException {
        List<String> children;
        if (depth == 4L) {
            List<String> children2;
            try {
                children2 = this.subTreeCache.getChildren(parent);
            }
            catch (KeeperException.NoNodeException nne) {
                return -1L;
            }
            Collections.shuffle(children2);
            List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
            while (children2.size() > 0) {
                String tryChild = children2.get(0);
                try {
                    List<String> locks = this.subTreeCache.getChildren(this.urLockPath);
                    if (locks.contains(tryChild)) {
                        children2.remove(tryChild);
                        continue;
                    }
                    Stat stat = this.zkc.exists(parent + "/" + tryChild, false);
                    if (stat == null) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{}/{} doesn't exist", (Object)parent, (Object)tryChild);
                        }
                        children2.remove(tryChild);
                        continue;
                    }
                    String lockPath = this.urLockPath + "/" + tryChild;
                    long ledgerId = this.getLedgerId(tryChild);
                    this.zkc.create(lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
                    this.heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion()));
                    return ledgerId;
                }
                catch (KeeperException.NodeExistsException nee) {
                    children2.remove(tryChild);
                }
                catch (NumberFormatException nfe) {
                    children2.remove(tryChild);
                }
            }
            return -1L;
        }
        try {
            children = this.subTreeCache.getChildren(parent);
        }
        catch (KeeperException.NoNodeException nne) {
            return -1L;
        }
        Collections.shuffle(children);
        while (children.size() > 0) {
            String tryChild = children.get(0);
            String tryPath = parent + "/" + tryChild;
            long ledger = this.getLedgerToRereplicateFromHierarchy(tryPath, depth + 1L);
            if (ledger != -1L) {
                return ledger;
            }
            children.remove(tryChild);
        }
        return -1L;
    }

    @Override
    public long pollLedgerToRereplicate() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("pollLedgerToRereplicate()");
        }
        try {
            return this.getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public long getLedgerToRereplicate() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("getLedgerToRereplicate()");
        }
        while (true) {
            final CountDownLatch changedLatch = new CountDownLatch(1);
            Watcher w = new Watcher(){

                public void process(WatchedEvent e) {
                    if (e.getType() == Watcher.Event.EventType.NodeChildrenChanged || e.getType() == Watcher.Event.EventType.NodeDeleted || e.getType() == Watcher.Event.EventType.NodeCreated || e.getState() == Watcher.Event.KeeperState.Expired || e.getState() == Watcher.Event.KeeperState.Disconnected) {
                        changedLatch.countDown();
                    }
                }
            };
            try {
                SubTreeCache.WatchGuard wg = this.subTreeCache.registerWatcherWithGuard(w);
                Throwable throwable = null;
                try {
                    this.waitIfLedgerReplicationDisabled();
                    long ledger = this.getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
                    if (ledger != -1L) {
                        long l = ledger;
                        return l;
                    }
                    changedLatch.await();
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (wg == null) continue;
                    if (throwable != null) {
                        try {
                            wg.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    wg.close();
                }
            }
            catch (KeeperException ke) {
                throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
            }
        }
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        ReplicationEnableCb cb = new ReplicationEnableCb();
        if (!this.isLedgerReplicationEnabled()) {
            this.notifyLedgerReplicationEnabled(cb);
            cb.await();
        }
    }

    @Override
    public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("releaseLedger(ledgerId={})", (Object)ledgerId);
        }
        try {
            Lock l = this.heldLocks.remove(ledgerId);
            if (l != null) {
                this.zkc.delete(l.getLockZNode(), -1);
            }
        }
        catch (KeeperException.NoNodeException l) {
        }
        catch (KeeperException ke) {
            LOG.error("Error deleting underreplicated ledger lock", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
    }

    @Override
    public void close() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("close()");
        }
        try {
            for (Map.Entry<Long, Lock> e : this.heldLocks.entrySet()) {
                this.zkc.delete(e.getValue().getLockZNode(), -1);
            }
        }
        catch (KeeperException.NoNodeException noNodeException) {
        }
        catch (KeeperException ke) {
            LOG.error("Error deleting underreplicated ledger lock", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
    }

    @Override
    public void disableLedgerReplication() throws ReplicationException.UnavailableException {
        List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
        if (LOG.isDebugEnabled()) {
            LOG.debug("disableLedegerReplication()");
        }
        try {
            String znode = this.basePath + '/' + "disable";
            this.zkc.create(znode, "".getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
            LOG.info("Auto ledger re-replication is disabled!");
        }
        catch (KeeperException.NodeExistsException ke) {
            LOG.warn("AutoRecovery is already disabled!", (Throwable)ke);
            throw new ReplicationException.UnavailableException("AutoRecovery is already disabled!", ke);
        }
        catch (KeeperException ke) {
            LOG.error("Exception while stopping auto ledger re-replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Exception while stopping auto ledger re-replication", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while stopping auto ledger re-replication", ie);
        }
    }

    @Override
    public void enableLedgerReplication() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("enableLedegerReplication()");
        }
        try {
            this.zkc.delete(this.basePath + '/' + "disable", -1);
            LOG.info("Resuming automatic ledger re-replication");
        }
        catch (KeeperException.NoNodeException ke) {
            LOG.warn("AutoRecovery is already enabled!", (Throwable)ke);
            throw new ReplicationException.UnavailableException("AutoRecovery is already enabled!", ke);
        }
        catch (KeeperException ke) {
            LOG.error("Exception while resuming ledger replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Exception while resuming auto ledger re-replication", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while resuming auto ledger re-replication", ie);
        }
    }

    @Override
    public boolean isLedgerReplicationEnabled() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("isLedgerReplicationEnabled()");
        }
        try {
            return null == this.zkc.exists(this.basePath + '/' + "disable", false);
        }
        catch (KeeperException ke) {
            LOG.error("Error while checking the state of ledger re-replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public void notifyLedgerReplicationEnabled(final BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("notifyLedgerReplicationEnabled()");
        }
        Watcher w = new Watcher(){

            public void process(WatchedEvent e) {
                if (e.getType() == Watcher.Event.EventType.NodeDeleted) {
                    cb.operationComplete(0, null);
                }
            }
        };
        try {
            if (null == this.zkc.exists(this.basePath + '/' + "disable", w)) {
                cb.operationComplete(0, null);
                return;
            }
        }
        catch (KeeperException ke) {
            LOG.error("Error while checking the state of ledger re-replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    public static boolean isLedgerBeingReplicated(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId) throws KeeperException, InterruptedException {
        return zkc.exists(ZkLedgerUnderreplicationManager.getUrLedgerLockZnode(ZkLedgerUnderreplicationManager.getUrLockPath(zkLedgersRootPath), ledgerId), false) != null;
    }

    public static void acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId, List<ACL> zkAcls) throws KeeperException, InterruptedException {
        ZkUtils.createFullPathOptimistic(zkc, ZkLedgerUnderreplicationManager.getUrLedgerLockZnode(ZkLedgerUnderreplicationManager.getUrLockPath(zkLedgersRootPath), ledgerId), LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
    }

    public static void releaseUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId) throws InterruptedException, KeeperException {
        if (ZkLedgerUnderreplicationManager.isLedgerBeingReplicated(zkc, zkLedgersRootPath, ledgerId)) {
            zkc.delete(ZkLedgerUnderreplicationManager.getUrLedgerLockZnode(ZkLedgerUnderreplicationManager.getUrLockPath(zkLedgersRootPath), ledgerId), -1);
        }
    }

    @Override
    public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws ReplicationException.UnavailableException {
        LOG.debug("initializeLostBookieRecoveryDelay()");
        try {
            this.zkc.create(this.lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(Charsets.UTF_8), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        catch (KeeperException.NodeExistsException ke) {
            LOG.info("lostBookieRecoveryDelay Znode is already present, so using existing lostBookieRecoveryDelay Znode value");
            return false;
        }
        catch (KeeperException ke) {
            LOG.error("Error while initializing LostBookieRecoveryDelay", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
        return true;
    }

    @Override
    public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws ReplicationException.UnavailableException {
        LOG.debug("setLostBookieRecoveryDelay()");
        try {
            if (this.zkc.exists(this.lostBookieRecoveryDelayZnode, false) != null) {
                this.zkc.setData(this.lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(Charsets.UTF_8), -1);
            } else {
                this.zkc.create(this.lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(Charsets.UTF_8), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }
        catch (KeeperException ke) {
            LOG.error("Error while setting LostBookieRecoveryDelay ", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableException {
        LOG.debug("getLostBookieRecoveryDelay()");
        try {
            byte[] data = this.zkc.getData(this.lostBookieRecoveryDelayZnode, false, null);
            return Integer.parseInt(new String(data, Charsets.UTF_8));
        }
        catch (KeeperException ke) {
            LOG.error("Error while getting LostBookieRecoveryDelay ", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public void notifyLostBookieRecoveryDelayChanged(final BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws ReplicationException.UnavailableException {
        LOG.debug("notifyLostBookieRecoveryDelayChanged()");
        Watcher w = new Watcher(){

            public void process(WatchedEvent e) {
                if (e.getType() == Watcher.Event.EventType.NodeDataChanged) {
                    cb.operationComplete(0, null);
                }
            }
        };
        try {
            if (null == this.zkc.exists(this.lostBookieRecoveryDelayZnode, w)) {
                cb.operationComplete(0, null);
                return;
            }
        }
        catch (KeeperException ke) {
            LOG.error("Error while checking the state of lostBookieRecoveryDelay", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    private static class Lock {
        private final String lockZNode;
        private final int ledgerZNodeVersion;

        Lock(String lockZNode, int ledgerZNodeVersion) {
            this.lockZNode = lockZNode;
            this.ledgerZNodeVersion = ledgerZNodeVersion;
        }

        String getLockZNode() {
            return this.lockZNode;
        }

        int getLedgerZNodeVersion() {
            return this.ledgerZNodeVersion;
        }
    }
}

