/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.internal.zookeeper;

import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.net.InetAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LeaderElection
extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
    private final String guid = UUID.randomUUID().toString();
    private final ZKClient zkClient;
    private final String zkFolderPath;
    private final ElectionHandler handler;
    private ExecutorService executor;
    private String zkNodePath;
    private State state;
    private Cancellable watcherCancellable;

    public LeaderElection(ZKClient zkClient, String prefix, ElectionHandler handler) {
        this.zkClient = zkClient;
        this.zkFolderPath = prefix.startsWith("/") ? prefix : "/" + prefix;
        this.handler = handler;
    }

    protected void doStart() {
        LOG.info("Start leader election on {}{} with guid {}", new Object[]{this.zkClient.getConnectString(), this.zkFolderPath, this.guid});
        this.executor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory((String)("leader-election" + this.zkFolderPath.replace('/', '-'))));
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                LeaderElection.this.register();
                LeaderElection.this.watcherCancellable = LeaderElection.this.zkClient.addConnectionWatcher(LeaderElection.this.wrapWatcher(new ConnectionWatcher()));
            }
        });
        this.notifyStarted();
    }

    protected void doStop() {
        final SettableFuture completion = SettableFuture.create();
        Futures.addCallback((ListenableFuture)completion, (FutureCallback)new FutureCallback<String>(){

            public void onSuccess(String result) {
                try {
                    LeaderElection.this.notifyStopped();
                }
                finally {
                    LeaderElection.this.executor.shutdown();
                }
            }

            public void onFailure(Throwable t) {
                try {
                    LeaderElection.this.notifyFailed(t);
                }
                finally {
                    LeaderElection.this.executor.shutdown();
                }
            }
        }, (Executor)Threads.SAME_THREAD_EXECUTOR);
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                if (LeaderElection.this.watcherCancellable != null) {
                    LeaderElection.this.watcherCancellable.cancel();
                }
                if (LeaderElection.this.state != State.CANCELLED) {
                    if (LeaderElection.this.state == State.LEADER) {
                        LeaderElection.this.becomeFollower();
                    }
                    LeaderElection.this.state = State.CANCELLED;
                    LeaderElection.this.doDeleteNode((SettableFuture<String>)completion);
                }
            }
        });
    }

    private byte[] getNodeData() {
        String hostname;
        try {
            hostname = InetAddress.getLocalHost().getCanonicalHostName();
        }
        catch (Exception e) {
            LOG.warn("Failed to get local hostname.", (Throwable)e);
            hostname = "unknown";
        }
        return hostname.getBytes(Charsets.UTF_8);
    }

    private void register() {
        this.state = State.IN_PROGRESS;
        this.zkNodePath = null;
        final String path = String.format("%s/%s-", this.zkFolderPath, this.guid);
        LOG.debug("Registering for election {} with path {}", (Object)this.zkFolderPath, (Object)path);
        OperationFuture<String> createFuture = this.zkClient.create(path, this.getNodeData(), CreateMode.EPHEMERAL_SEQUENTIAL, true);
        Futures.addCallback(createFuture, (FutureCallback)new FutureCallback<String>(){

            public void onSuccess(String result) {
                LOG.debug("Created zk node {}", (Object)result);
                LeaderElection.this.zkNodePath = result;
                if (LeaderElection.this.state == State.CANCELLED) {
                    LeaderElection.this.deleteNode();
                } else {
                    LeaderElection.this.runElection();
                }
            }

            public void onFailure(Throwable t) {
                LOG.error("Got exception during node creation for folder {}", (Object)path, (Object)t);
                LeaderElection.this.runElection();
            }
        }, (Executor)this.executor);
    }

    private void runElection() {
        LOG.debug("Running election for {}", (Object)this.zkNodePath);
        OperationFuture<NodeChildren> childrenFuture = this.zkClient.getChildren(this.zkFolderPath);
        Futures.addCallback(childrenFuture, (FutureCallback)new FutureCallback<NodeChildren>(){

            public void onSuccess(NodeChildren result) {
                Optional nodeToWatch = LeaderElection.this.findNodeToWatch(result.getChildren());
                if (LeaderElection.this.state == State.CANCELLED) {
                    LeaderElection.this.deleteNode();
                    return;
                }
                if (nodeToWatch == null) {
                    LeaderElection.this.register();
                    return;
                }
                if (nodeToWatch.isPresent()) {
                    LeaderElection.this.watchNode(LeaderElection.this.zkFolderPath + "/" + (String)nodeToWatch.get(), new LowerNodeWatcher());
                } else {
                    LeaderElection.this.becomeLeader();
                }
            }

            public void onFailure(Throwable t) {
                LOG.warn("Got exception during children fetch for {}. Retry.", (Object)LeaderElection.this.zkFolderPath, (Object)t);
                if (LeaderElection.this.state == State.CANCELLED && LeaderElection.this.zkNodePath != null) {
                    LeaderElection.this.deleteNode();
                } else {
                    LeaderElection.this.runElection();
                }
            }
        }, (Executor)this.executor);
    }

    private void becomeLeader() {
        this.state = State.LEADER;
        LOG.debug("Become leader for {}.", (Object)this.zkNodePath);
        try {
            this.handler.leader();
        }
        catch (Throwable t) {
            LOG.warn("Exception thrown when calling leader() method. Withdraw from the leader election process.", t);
            this.stop();
        }
    }

    private void becomeFollower() {
        this.state = State.FOLLOWER;
        LOG.debug("Become follower for {}", (Object)this.zkNodePath);
        try {
            this.handler.follower();
        }
        catch (Throwable t) {
            LOG.warn("Exception thrown when calling follower() method. Withdraw from the leader election process.", t);
            this.stop();
        }
    }

    private void watchNode(final String nodePath, Watcher watcher) {
        OperationFuture<NodeData> watchFuture = this.zkClient.getData(nodePath, watcher);
        Futures.addCallback(watchFuture, (FutureCallback)new FutureCallback<NodeData>(){

            public void onSuccess(NodeData nodeData) {
                if (LeaderElection.this.state != State.CANCELLED) {
                    LeaderElection.this.becomeFollower();
                }
            }

            public void onFailure(Throwable t) {
                LOG.debug("Exception while setting watch on node {}. Retry.", (Object)nodePath, (Object)t);
                LeaderElection.this.runElection();
            }
        }, (Executor)this.executor);
    }

    private ListenableFuture<String> deleteNode() {
        SettableFuture completion = SettableFuture.create();
        this.doDeleteNode((SettableFuture<String>)completion);
        return completion;
    }

    private void doDeleteNode(final SettableFuture<String> completion) {
        if (this.zkNodePath == null) {
            completion.set(null);
            return;
        }
        try {
            Futures.addCallback(this.zkClient.delete(this.zkNodePath), (FutureCallback)new FutureCallback<String>(){

                public void onSuccess(String result) {
                    LOG.debug("Node deleted: {}", (Object)result);
                    completion.set((Object)result);
                }

                public void onFailure(Throwable t) {
                    LOG.warn("Fail to delete node: {}", (Object)LeaderElection.this.zkNodePath);
                    if (!(t instanceof KeeperException.NoNodeException)) {
                        LOG.debug("Retry delete node: {}", (Object)LeaderElection.this.zkNodePath);
                        LeaderElection.this.doDeleteNode((SettableFuture<String>)completion);
                    } else {
                        completion.setException(t);
                    }
                }
            }, (Executor)this.executor);
        }
        catch (Throwable t) {
            completion.setException(t);
        }
    }

    private Watcher wrapWatcher(final Watcher watcher) {
        return new Watcher(){

            public void process(final WatchedEvent event) {
                LeaderElection.this.executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        watcher.process(event);
                    }
                });
            }
        };
    }

    private Optional<String> findNodeToWatch(List<String> nodes) {
        if (this.zkNodePath == null) {
            for (String node : nodes) {
                if (!node.startsWith(this.guid)) continue;
                this.zkNodePath = this.zkFolderPath + "/" + node;
                break;
            }
        }
        if (this.zkNodePath == null) {
            return null;
        }
        int currentId = Integer.parseInt(this.zkNodePath.substring(this.zkNodePath.indexOf(this.guid) + this.guid.length() + 1));
        String nodeToWatch = null;
        int maxOfMins = Integer.MIN_VALUE;
        for (String node : nodes) {
            int nodeId = Integer.parseInt(node.substring(this.guid.length() + 1));
            if (nodeId >= currentId || nodeId <= maxOfMins) continue;
            maxOfMins = nodeId;
            nodeToWatch = node;
        }
        return nodeToWatch == null ? Optional.absent() : Optional.of(nodeToWatch);
    }

    private class ConnectionWatcher
    implements Watcher {
        private boolean expired;
        private boolean disconnected;

        private ConnectionWatcher() {
        }

        public void process(WatchedEvent event) {
            switch (event.getState()) {
                case Disconnected: {
                    this.disconnected = true;
                    LOG.info("Disconnected from ZK: {} for {}", (Object)LeaderElection.this.zkClient.getConnectString(), (Object)LeaderElection.this.zkFolderPath);
                    if (LeaderElection.this.state != State.LEADER) break;
                    LOG.info("Stepping down as leader due to disconnect: {} for {}", (Object)LeaderElection.this.zkClient.getConnectString(), (Object)LeaderElection.this.zkFolderPath);
                    LeaderElection.this.becomeFollower();
                    break;
                }
                case SyncConnected: {
                    boolean runElection = this.disconnected && !this.expired && LeaderElection.this.state != State.IN_PROGRESS;
                    boolean runRegister = this.disconnected && this.expired && LeaderElection.this.state != State.IN_PROGRESS;
                    this.disconnected = false;
                    this.expired = false;
                    if (runElection) {
                        if (LeaderElection.this.state != State.CANCELLED) {
                            LeaderElection.this.state = State.IN_PROGRESS;
                        }
                        LOG.info("Connected to ZK, running election: {} for {}", (Object)LeaderElection.this.zkClient.getConnectString(), (Object)LeaderElection.this.zkFolderPath);
                        LeaderElection.this.runElection();
                        break;
                    }
                    if (!runRegister || LeaderElection.this.state == State.CANCELLED) break;
                    LOG.info("Connected to ZK, registering: {} for {}", (Object)LeaderElection.this.zkClient.getConnectString(), (Object)LeaderElection.this.zkFolderPath);
                    LeaderElection.this.register();
                    break;
                }
                case Expired: {
                    LOG.info("ZK session expired: {} for {}", (Object)LeaderElection.this.zkClient.getConnectString(), (Object)LeaderElection.this.zkFolderPath);
                    this.expired = true;
                }
            }
        }
    }

    private class LowerNodeWatcher
    implements Watcher {
        private LowerNodeWatcher() {
        }

        public void process(WatchedEvent event) {
            if (LeaderElection.this.state != State.CANCELLED && event.getType() == Watcher.Event.EventType.NodeDeleted) {
                LOG.debug("Lower node deleted {} for election {}.", (Object)event, (Object)LeaderElection.this.zkNodePath);
                LeaderElection.this.runElection();
            }
        }
    }

    private static enum State {
        IN_PROGRESS,
        LEADER,
        FOLLOWER,
        CANCELLED;

    }
}

