/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.galaxy.zookeeper;

import co.paralleluniverse.galaxy.cluster.DistributedTreeUtil;
import co.paralleluniverse.galaxy.core.AbstractCluster;
import co.paralleluniverse.galaxy.core.RefAllocator;
import co.paralleluniverse.galaxy.core.RefAllocatorSupport;
import co.paralleluniverse.galaxy.core.RootLocker;
import co.paralleluniverse.galaxy.zookeeper.ZooKeeperDistributedTree;
import com.google.common.base.Throwables;
import java.beans.ConstructorProperties;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperCluster
extends AbstractCluster
implements RootLocker,
RefAllocator {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCluster.class);
    private static final long INITIAL_REF_ID = 0x100000000L;
    private static final String ROOT_LOCKS = "/co.paralleluniverse.galaxy/root_locks";
    private static final String REF_COUNTER = "/co.paralleluniverse.galaxy/ref_counter";
    private final String zkConnectString;
    private int sessionTimeoutMs = 15000;
    private int connectionTimeoutMs = 10000;
    private RetryPolicy retryPolicy = new ExponentialBackoffRetry(20, 20);
    private CuratorFramework client;
    private String myNodeName;
    private final RefAllocatorSupport refAllocatorSupport = new RefAllocatorSupport();
    private final ExecutorService refAllocationExecutor = Executors.newFixedThreadPool(1);
    private DistributedAtomicLong refIdCounter;
    private volatile boolean counterReady;

    @ConstructorProperties(value={"name", "nodeId", "zkConnectString"})
    public ZooKeeperCluster(String name, short nodeId, String zkConnectString) throws Exception {
        super(name, nodeId);
        this.zkConnectString = zkConnectString;
    }

    public void setConnectionTimeoutMs(int connectionTimeoutMs) {
        this.assertDuringInitialization();
        this.connectionTimeoutMs = connectionTimeoutMs;
    }

    public void setRetryPolicy(RetryPolicy retryPolicy) {
        this.assertDuringInitialization();
        this.retryPolicy = retryPolicy;
    }

    public void setSessionTimeoutMs(int sessionTimeoutMs) {
        this.assertDuringInitialization();
        this.sessionTimeoutMs = sessionTimeoutMs;
    }

    @Override
    protected void init() throws Exception {
        super.init();
        this.client = CuratorFrameworkFactory.builder().connectString(this.zkConnectString).sessionTimeoutMs(this.sessionTimeoutMs).connectionTimeoutMs(this.connectionTimeoutMs).retryPolicy(this.retryPolicy).defaultData(new byte[0]).build();
        this.client.start();
        try {
            ((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath("/co.paralleluniverse.galaxy/node_names");
        }
        catch (KeeperException.NodeExistsException e) {
            // empty catch block
        }
        this.myNodeName = DistributedTreeUtil.child((String)((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath("/co.paralleluniverse.galaxy/node_names/node-"));
        LOG.info("Node name is {}, id is {}", (Object)this.myNodeName, (Object)this.myId);
        this.setName(this.myNodeName);
        this.initRefIdCounter();
        ZooKeeperDistributedTree tree = new ZooKeeperDistributedTree(this.client);
        this.setControlTree(tree);
        super.init();
    }

    private void initRefIdCounter() throws Exception {
        this.refIdCounter = new DistributedAtomicLong(this.client, REF_COUNTER, this.retryPolicy);
        AtomicValue av = this.refIdCounter.increment();
        if (!av.succeeded()) {
            throw new RuntimeException("Error initializing refIdCounter");
        }
        if (!this.hasServer()) {
            this.setCounter(0x100000000L);
        }
        this.refAllocationExecutor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                LOG.info("Waiting for id counter to be set...");
                try {
                    AtomicValue av;
                    while (true) {
                        if ((av = ZooKeeperCluster.this.refIdCounter.get()).succeeded()) {
                            if ((Long)av.postValue() >= 0x100000000L) {
                                break;
                            }
                        } else {
                            LOG.info("Failed to read counter");
                        }
                        Thread.sleep(500L);
                    }
                    LOG.info("Id counter set: {}", av.postValue());
                    ZooKeeperCluster.this.counterReady = true;
                    ZooKeeperCluster.this.refAllocatorSupport.fireCounterReady();
                    return null;
                }
                catch (Exception e) {
                    throw Throwables.propagate((Throwable)e);
                }
            }
        });
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.refAllocationExecutor.shutdownNow();
        this.client.close();
    }

    @Override
    protected boolean isMe(AbstractCluster.NodeInfoImpl node) {
        return this.myNodeName.equals(node.getName());
    }

    @Override
    public Object getUnderlyingResource() {
        return this.client;
    }

    @Override
    public Object lockRoot(int id) {
        try {
            InterProcessMutex mutex = new InterProcessMutex(this.client, "/co.paralleluniverse.galaxy/root_locks/" + id);
            mutex.acquire();
            return mutex;
        }
        catch (Exception ex) {
            throw Throwables.propagate((Throwable)ex);
        }
    }

    @Override
    public void unlockRoot(Object lock) {
        try {
            InterProcessMutex mutex = (InterProcessMutex)lock;
            mutex.release();
        }
        catch (Exception ex) {
            throw Throwables.propagate((Throwable)ex);
        }
    }

    @Override
    public void addRefAllocationsListener(RefAllocator.RefAllocationsListener listener) {
        this.refAllocatorSupport.addRefAllocationsListener(listener);
        if (this.counterReady) {
            listener.counterReady();
        }
    }

    @Override
    public void removeRefAllocationsListener(RefAllocator.RefAllocationsListener listener) {
        this.refAllocatorSupport.addRefAllocationsListener(listener);
    }

    @Override
    public Collection<RefAllocator.RefAllocationsListener> getRefAllocationsListeners() {
        return this.refAllocatorSupport.getRefAllocationListeners();
    }

    private boolean setCounter(long initialValue) {
        initialValue = Math.max(initialValue, 0x100000000L);
        LOG.info("Setting ref counter to {}", (Object)initialValue);
        try {
            long id = 0L;
            while (true) {
                AtomicValue av;
                if ((av = this.refIdCounter.compareAndSet(Long.valueOf(id), Long.valueOf(initialValue))).succeeded()) {
                    assert ((Long)av.postValue() == initialValue);
                    LOG.info("Set id counter to {}", (Object)initialValue);
                    return true;
                }
                if ((Long)av.postValue() >= initialValue) {
                    LOG.info("Id counter set by someone else to {}", (Object)initialValue);
                    return false;
                }
                id = (Long)av.preValue();
                Thread.sleep(500L);
            }
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    @Override
    public void allocateRefs(final int count) {
        this.refAllocationExecutor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    LOG.info("Allocating {} IDs", (Object)count);
                    AtomicValue av = ZooKeeperCluster.this.refIdCounter.add(Long.valueOf(count));
                    if (av.succeeded()) {
                        ZooKeeperCluster.this.refAllocatorSupport.fireRefsAllocated((Long)av.preValue(), count);
                    } else {
                        LOG.error("Allocating ref IDs has failed!");
                    }
                }
                catch (Exception e) {
                    LOG.error("Allocating ref IDs has failed!", (Throwable)e);
                }
            }
        });
    }
}

