/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ha;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ActiveStandbyElector
implements AsyncCallback.StatCallback,
AsyncCallback.StringCallback {
    @VisibleForTesting
    protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock";
    @VisibleForTesting
    protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb";
    public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
    private static final int NUM_RETRIES = 3;
    private State state = State.INIT;
    private int createRetryCount = 0;
    private int statRetryCount = 0;
    private ZooKeeper zkClient;
    private ConnectionState zkConnectionState = ConnectionState.TERMINATED;
    private final ActiveStandbyElectorCallback appClient;
    private final String zkHostPort;
    private final int zkSessionTimeout;
    private final List<ACL> zkAcl;
    private byte[] appData;
    private final String zkLockFilePath;
    private final String zkBreadCrumbPath;
    private final String znodeWorkingDir;
    private Lock sessionReestablishLockForTests = new ReentrantLock();
    private boolean wantToBeInElection;

    public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException, HadoopIllegalArgumentException {
        if (app == null || acl == null || parentZnodeName == null || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
            throw new HadoopIllegalArgumentException("Invalid argument");
        }
        this.zkHostPort = zookeeperHostPorts;
        this.zkSessionTimeout = zookeeperSessionTimeout;
        this.zkAcl = acl;
        this.appClient = app;
        this.znodeWorkingDir = parentZnodeName;
        this.zkLockFilePath = this.znodeWorkingDir + "/" + LOCK_FILENAME;
        this.zkBreadCrumbPath = this.znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
        this.createConnection();
    }

    public synchronized void joinElection(byte[] data) throws HadoopIllegalArgumentException {
        LOG.debug((Object)"Attempting active election");
        if (data == null) {
            throw new HadoopIllegalArgumentException("data cannot be null");
        }
        this.appData = new byte[data.length];
        System.arraycopy(data, 0, this.appData, 0, data.length);
        this.joinElectionInternal();
    }

    public synchronized boolean parentZNodeExists() throws IOException, InterruptedException {
        Preconditions.checkState((this.zkClient != null ? 1 : 0) != 0);
        try {
            return this.zkClient.exists(this.znodeWorkingDir, false) != null;
        }
        catch (KeeperException e) {
            throw new IOException("Couldn't determine existence of znode '" + this.znodeWorkingDir + "'", e);
        }
    }

    public synchronized void ensureParentZNode() throws IOException, InterruptedException {
        String[] pathParts = this.znodeWorkingDir.split("/");
        Preconditions.checkArgument((pathParts.length >= 1 && "".equals(pathParts[0]) ? 1 : 0) != 0, (String)"Invalid path: %s", (Object[])new Object[]{this.znodeWorkingDir});
        StringBuilder sb = new StringBuilder();
        for (int i = 1; i < pathParts.length; ++i) {
            sb.append("/").append(pathParts[i]);
            String prefixPath = sb.toString();
            LOG.debug((Object)("Ensuring existence of " + prefixPath));
            try {
                this.createWithRetries(prefixPath, new byte[0], this.zkAcl, CreateMode.PERSISTENT);
                continue;
            }
            catch (KeeperException e) {
                if (ActiveStandbyElector.isNodeExists(e.code())) continue;
                throw new IOException("Couldn't create " + prefixPath, e);
            }
        }
        LOG.info((Object)("Successfully created " + this.znodeWorkingDir + " in ZK."));
    }

    public synchronized void clearParentZNode() throws IOException, InterruptedException {
        try {
            LOG.info((Object)("Recursively deleting " + this.znodeWorkingDir + " from ZK..."));
            ActiveStandbyElector.zkDoWithRetries(new ZKAction<Void>(){

                @Override
                public Void run() throws KeeperException, InterruptedException {
                    ZKUtil.deleteRecursive((ZooKeeper)ActiveStandbyElector.this.zkClient, (String)ActiveStandbyElector.this.znodeWorkingDir);
                    return null;
                }
            });
        }
        catch (KeeperException e) {
            throw new IOException("Couldn't clear parent znode " + this.znodeWorkingDir, e);
        }
        LOG.info((Object)("Successfully deleted " + this.znodeWorkingDir + " from ZK."));
    }

    public synchronized void quitElection(boolean needFence) {
        LOG.info((Object)"Yielding from election");
        if (!needFence && this.state == State.ACTIVE) {
            this.tryDeleteOwnBreadCrumbNode();
        }
        this.reset();
        this.wantToBeInElection = false;
    }

    public synchronized byte[] getActiveData() throws ActiveNotFoundException, KeeperException, InterruptedException, IOException {
        try {
            if (this.zkClient == null) {
                this.createConnection();
            }
            Stat stat = new Stat();
            return this.zkClient.getData(this.zkLockFilePath, false, stat);
        }
        catch (KeeperException e) {
            KeeperException.Code code = e.code();
            if (ActiveStandbyElector.isNodeDoesNotExist(code)) {
                throw new ActiveNotFoundException();
            }
            throw e;
        }
    }

    public synchronized void processResult(int rc, String path, Object ctx, String name) {
        if (this.isStaleClient(ctx)) {
            return;
        }
        LOG.debug((Object)("CreateNode result: " + rc + " for path: " + path + " connectionState: " + (Object)((Object)this.zkConnectionState)));
        KeeperException.Code code = KeeperException.Code.get((int)rc);
        if (ActiveStandbyElector.isSuccess(code)) {
            this.becomeActive();
            this.monitorActiveStatus();
            return;
        }
        if (ActiveStandbyElector.isNodeExists(code)) {
            if (this.createRetryCount == 0) {
                this.becomeStandby();
            }
            this.monitorActiveStatus();
            return;
        }
        String errorMessage = "Received create error from Zookeeper. code:" + code.toString() + " for path " + path;
        LOG.debug((Object)errorMessage);
        if (ActiveStandbyElector.shouldRetry(code)) {
            if (this.createRetryCount < 3) {
                LOG.debug((Object)("Retrying createNode createRetryCount: " + this.createRetryCount));
                ++this.createRetryCount;
                this.createLockNodeAsync();
                return;
            }
            errorMessage = errorMessage + ". Not retrying further znode create connection errors.";
        } else if (ActiveStandbyElector.isSessionExpired(code)) {
            LOG.warn((Object)"Lock acquisition failed because session was lost");
            return;
        }
        this.fatalError(errorMessage);
    }

    public synchronized void processResult(int rc, String path, Object ctx, Stat stat) {
        if (this.isStaleClient(ctx)) {
            return;
        }
        LOG.debug((Object)("StatNode result: " + rc + " for path: " + path + " connectionState: " + (Object)((Object)this.zkConnectionState)));
        KeeperException.Code code = KeeperException.Code.get((int)rc);
        if (ActiveStandbyElector.isSuccess(code)) {
            if (stat.getEphemeralOwner() == this.zkClient.getSessionId()) {
                this.becomeActive();
            } else {
                this.becomeStandby();
            }
            return;
        }
        if (ActiveStandbyElector.isNodeDoesNotExist(code)) {
            this.enterNeutralMode();
            this.joinElectionInternal();
            return;
        }
        String errorMessage = "Received stat error from Zookeeper. code:" + code.toString();
        LOG.debug((Object)errorMessage);
        if (ActiveStandbyElector.shouldRetry(code)) {
            if (this.statRetryCount < 3) {
                ++this.statRetryCount;
                this.monitorLockNodeAsync();
                return;
            }
            errorMessage = errorMessage + ". Not retrying further znode monitoring connection errors.";
        }
        this.fatalError(errorMessage);
    }

    synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
        Watcher.Event.EventType eventType = event.getType();
        if (this.isStaleClient(zk)) {
            return;
        }
        LOG.debug((Object)("Watcher event type: " + eventType + " with state:" + event.getState() + " for path:" + event.getPath() + " connectionState: " + (Object)((Object)this.zkConnectionState)));
        if (eventType == Watcher.Event.EventType.None) {
            switch (event.getState()) {
                case SyncConnected: {
                    LOG.info((Object)"Session connected.");
                    ConnectionState prevConnectionState = this.zkConnectionState;
                    this.zkConnectionState = ConnectionState.CONNECTED;
                    if (prevConnectionState != ConnectionState.DISCONNECTED) break;
                    this.monitorActiveStatus();
                    break;
                }
                case Disconnected: {
                    LOG.info((Object)"Session disconnected. Entering neutral mode...");
                    this.zkConnectionState = ConnectionState.DISCONNECTED;
                    this.enterNeutralMode();
                    break;
                }
                case Expired: {
                    LOG.info((Object)"Session expired. Entering neutral mode and rejoining...");
                    this.enterNeutralMode();
                    this.reJoinElection();
                    break;
                }
                default: {
                    this.fatalError("Unexpected Zookeeper watch event state: " + event.getState());
                }
            }
            return;
        }
        String path = event.getPath();
        if (path != null) {
            switch (eventType) {
                case NodeDeleted: {
                    if (this.state == State.ACTIVE) {
                        this.enterNeutralMode();
                    }
                    this.joinElectionInternal();
                    break;
                }
                case NodeDataChanged: {
                    this.monitorActiveStatus();
                    break;
                }
                default: {
                    LOG.debug((Object)("Unexpected node event: " + eventType + " for path: " + path));
                    this.monitorActiveStatus();
                }
            }
            return;
        }
        this.fatalError("Unexpected watch error from Zookeeper");
    }

    protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
        ZooKeeper zk = new ZooKeeper(this.zkHostPort, this.zkSessionTimeout, null);
        zk.register((Watcher)new WatcherWithClientRef(zk));
        return zk;
    }

    private void fatalError(String errorMessage) {
        this.reset();
        this.appClient.notifyFatalError(errorMessage);
    }

    private void monitorActiveStatus() {
        LOG.debug((Object)"Monitoring active leader");
        this.statRetryCount = 0;
        this.monitorLockNodeAsync();
    }

    private void joinElectionInternal() {
        if (this.zkClient == null && !this.reEstablishSession()) {
            this.fatalError("Failed to reEstablish connection with ZooKeeper");
            return;
        }
        this.createRetryCount = 0;
        this.wantToBeInElection = true;
        this.createLockNodeAsync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reJoinElection() {
        LOG.info((Object)"Trying to re-establish ZK session");
        this.sessionReestablishLockForTests.lock();
        try {
            this.terminateConnection();
            this.joinElectionInternal();
        }
        finally {
            this.sessionReestablishLockForTests.unlock();
        }
    }

    @VisibleForTesting
    void preventSessionReestablishmentForTests() {
        this.sessionReestablishLockForTests.lock();
    }

    @VisibleForTesting
    void allowSessionReestablishmentForTests() {
        this.sessionReestablishLockForTests.unlock();
    }

    @VisibleForTesting
    long getZKSessionIdForTests() {
        return this.zkClient.getSessionId();
    }

    @VisibleForTesting
    synchronized State getStateForTests() {
        return this.state;
    }

    private boolean reEstablishSession() {
        boolean success = false;
        for (int connectionRetryCount = 0; !success && connectionRetryCount < 3; ++connectionRetryCount) {
            LOG.debug((Object)"Establishing zookeeper connection");
            try {
                this.createConnection();
                success = true;
                continue;
            }
            catch (IOException e) {
                LOG.warn((Object)e);
                try {
                    Thread.sleep(5000L);
                    continue;
                }
                catch (InterruptedException e1) {
                    LOG.warn((Object)e1);
                }
            }
        }
        return success;
    }

    private void createConnection() throws IOException {
        this.zkClient = this.getNewZooKeeper();
    }

    private void terminateConnection() {
        if (this.zkClient == null) {
            return;
        }
        LOG.debug((Object)"Terminating ZK connection");
        ZooKeeper tempZk = this.zkClient;
        this.zkClient = null;
        try {
            tempZk.close();
        }
        catch (InterruptedException e) {
            LOG.warn((Object)e);
        }
        this.zkConnectionState = ConnectionState.TERMINATED;
    }

    private void reset() {
        this.state = State.INIT;
        this.terminateConnection();
    }

    private void becomeActive() {
        assert (this.wantToBeInElection);
        if (this.state != State.ACTIVE) {
            try {
                Stat oldBreadcrumbStat = this.fenceOldActive();
                this.writeBreadCrumbNode(oldBreadcrumbStat);
            }
            catch (Exception e) {
                LOG.warn((Object)"Exception handling the winning of election", (Throwable)e);
                this.reJoinElection();
                return;
            }
            LOG.debug((Object)"Becoming active");
            this.state = State.ACTIVE;
            this.appClient.becomeActive();
        }
    }

    private void writeBreadCrumbNode(Stat oldBreadcrumbStat) throws KeeperException, InterruptedException {
        LOG.info((Object)("Writing znode " + this.zkBreadCrumbPath + " to indicate that the local node is the most recent active..."));
        if (oldBreadcrumbStat == null) {
            this.createWithRetries(this.zkBreadCrumbPath, this.appData, this.zkAcl, CreateMode.PERSISTENT);
        } else {
            this.setDataWithRetries(this.zkBreadCrumbPath, this.appData, oldBreadcrumbStat.getVersion());
        }
    }

    private void tryDeleteOwnBreadCrumbNode() {
        assert (this.state == State.ACTIVE);
        LOG.info((Object)"Deleting bread-crumb of active node...");
        Stat stat = new Stat();
        byte[] data = null;
        try {
            data = this.zkClient.getData(this.zkBreadCrumbPath, false, stat);
            if (!Arrays.equals(data, this.appData)) {
                throw new IllegalStateException("We thought we were active, but in fact the active znode had the wrong data: " + StringUtils.byteToHexString(data) + " (stat=" + stat + ")");
            }
            this.deleteWithRetries(this.zkBreadCrumbPath, stat.getVersion());
        }
        catch (Exception e) {
            LOG.warn((Object)("Unable to delete our own bread-crumb of being active at " + this.zkBreadCrumbPath + ": " + e.getLocalizedMessage() + ". " + "Expecting to be fenced by the next active."));
        }
    }

    private Stat fenceOldActive() throws InterruptedException, KeeperException {
        byte[] data;
        final Stat stat = new Stat();
        LOG.info((Object)"Checking for any old active which needs to be fenced...");
        try {
            data = ActiveStandbyElector.zkDoWithRetries(new ZKAction<byte[]>(){

                @Override
                public byte[] run() throws KeeperException, InterruptedException {
                    return ActiveStandbyElector.this.zkClient.getData(ActiveStandbyElector.this.zkBreadCrumbPath, false, stat);
                }
            });
        }
        catch (KeeperException ke) {
            if (ActiveStandbyElector.isNodeDoesNotExist(ke.code())) {
                LOG.info((Object)"No old node to fence");
                return null;
            }
            throw ke;
        }
        LOG.info((Object)("Old node exists: " + StringUtils.byteToHexString(data)));
        if (Arrays.equals(data, this.appData)) {
            LOG.info((Object)"But old node has our own data, so don't need to fence it.");
        } else {
            this.appClient.fenceOldActive(data);
        }
        return stat;
    }

    private void becomeStandby() {
        if (this.state != State.STANDBY) {
            LOG.debug((Object)"Becoming standby");
            this.state = State.STANDBY;
            this.appClient.becomeStandby();
        }
    }

    private void enterNeutralMode() {
        if (this.state != State.NEUTRAL) {
            LOG.debug((Object)"Entering neutral mode");
            this.state = State.NEUTRAL;
            this.appClient.enterNeutralMode();
        }
    }

    private void createLockNodeAsync() {
        this.zkClient.create(this.zkLockFilePath, this.appData, this.zkAcl, CreateMode.EPHEMERAL, (AsyncCallback.StringCallback)this, (Object)this.zkClient);
    }

    private void monitorLockNodeAsync() {
        this.zkClient.exists(this.zkLockFilePath, (Watcher)new WatcherWithClientRef(this.zkClient), (AsyncCallback.StatCallback)this, (Object)this.zkClient);
    }

    private String createWithRetries(final String path, final byte[] data, final List<ACL> acl, final CreateMode mode) throws InterruptedException, KeeperException {
        return ActiveStandbyElector.zkDoWithRetries(new ZKAction<String>(){

            @Override
            public String run() throws KeeperException, InterruptedException {
                return ActiveStandbyElector.this.zkClient.create(path, data, acl, mode);
            }
        });
    }

    private Stat setDataWithRetries(final String path, final byte[] data, final int version) throws InterruptedException, KeeperException {
        return ActiveStandbyElector.zkDoWithRetries(new ZKAction<Stat>(){

            @Override
            public Stat run() throws KeeperException, InterruptedException {
                return ActiveStandbyElector.this.zkClient.setData(path, data, version);
            }
        });
    }

    private void deleteWithRetries(final String path, final int version) throws KeeperException, InterruptedException {
        ActiveStandbyElector.zkDoWithRetries(new ZKAction<Void>(){

            @Override
            public Void run() throws KeeperException, InterruptedException {
                ActiveStandbyElector.this.zkClient.delete(path, version);
                return null;
            }
        });
    }

    private static <T> T zkDoWithRetries(ZKAction<T> action) throws KeeperException, InterruptedException {
        int retry = 0;
        while (true) {
            try {
                return action.run();
            }
            catch (KeeperException ke) {
                if (ActiveStandbyElector.shouldRetry(ke.code()) && ++retry < 3) continue;
                throw ke;
            }
            break;
        }
    }

    private synchronized boolean isStaleClient(Object ctx) {
        Preconditions.checkNotNull((Object)ctx);
        if (this.zkClient != (ZooKeeper)ctx) {
            LOG.warn((Object)("Ignoring stale result from old client with sessionId " + String.format("0x%08x", ((ZooKeeper)ctx).getSessionId())));
            return true;
        }
        return false;
    }

    private static boolean isSuccess(KeeperException.Code code) {
        return code == KeeperException.Code.OK;
    }

    private static boolean isNodeExists(KeeperException.Code code) {
        return code == KeeperException.Code.NODEEXISTS;
    }

    private static boolean isNodeDoesNotExist(KeeperException.Code code) {
        return code == KeeperException.Code.NONODE;
    }

    private static boolean isSessionExpired(KeeperException.Code code) {
        return code == KeeperException.Code.SESSIONEXPIRED;
    }

    private static boolean shouldRetry(KeeperException.Code code) {
        switch (code) {
            case CONNECTIONLOSS: 
            case OPERATIONTIMEOUT: {
                return true;
            }
        }
        return false;
    }

    private final class WatcherWithClientRef
    implements Watcher {
        private final ZooKeeper zk;

        private WatcherWithClientRef(ZooKeeper zk) {
            this.zk = zk;
        }

        public void process(WatchedEvent event) {
            ActiveStandbyElector.this.processWatchEvent(this.zk, event);
        }
    }

    private static interface ZKAction<T> {
        public T run() throws KeeperException, InterruptedException;
    }

    public static class ActiveNotFoundException
    extends Exception {
        private static final long serialVersionUID = 3505396722342846462L;
    }

    static enum State {
        INIT,
        ACTIVE,
        STANDBY,
        NEUTRAL;

    }

    private static enum ConnectionState {
        DISCONNECTED,
        CONNECTED,
        TERMINATED;

    }

    public static interface ActiveStandbyElectorCallback {
        public void becomeActive();

        public void becomeStandby();

        public void enterNeutralMode();

        public void notifyFatalError(String var1);

        public void fenceOldActive(byte[] var1);
    }
}

