/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.master.zksyncer;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.CreateMode;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public abstract class ClientZKSyncer
extends ZKListener {
    private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class);
    private final Server server;
    private final ZKWatcher clientZkWatcher;
    private final Map<String, BlockingQueue<byte[]>> queues;

    public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
        super(watcher);
        this.server = server;
        this.clientZkWatcher = clientZkWatcher;
        this.queues = new HashMap<String, BlockingQueue<byte[]>>();
    }

    public void start() throws KeeperException {
        LOG.debug("Starting " + this.getClass().getSimpleName());
        this.watcher.registerListener(this);
        ZKUtil.createWithParents(this.clientZkWatcher, this.watcher.getZNodePaths().baseZNode);
        Collection<String> nodes = this.getNodesToWatch();
        LOG.debug("Znodes to watch: " + nodes);
        for (String node : nodes) {
            ArrayBlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(1);
            this.queues.put(node, queue);
            ClientZkUpdater updater = new ClientZkUpdater(node, queue);
            updater.setDaemon(true);
            updater.start();
            this.watchAndCheckExists(node);
        }
    }

    private void watchAndCheckExists(String node) {
        try {
            if (ZKUtil.watchAndCheckExists(this.watcher, node)) {
                byte[] data = ZKUtil.getDataAndWatch(this.watcher, node);
                if (data != null) {
                    this.upsertQueue(node, data);
                } else {
                    LOG.debug("Found no data from " + node);
                    this.watchAndCheckExists(node);
                }
            } else {
                ZKUtil.deleteNodeFailSilent(this.clientZkWatcher, node);
            }
        }
        catch (KeeperException e) {
            this.server.abort("Unexpected exception during initialization, aborting", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void upsertQueue(String node, byte[] data) {
        BlockingQueue<byte[]> queue;
        BlockingQueue<byte[]> blockingQueue = queue = this.queues.get(node);
        synchronized (blockingQueue) {
            queue.poll();
            queue.offer(data);
        }
    }

    private final void setDataForClientZkUntilSuccess(String node, byte[] data) throws InterruptedException {
        while (!this.server.isStopped()) {
            try {
                LOG.debug("Set data for remote " + node + ", client zk wather: " + this.clientZkWatcher);
                ZKUtil.setData(this.clientZkWatcher, node, data);
                break;
            }
            catch (KeeperException.NoNodeException nne) {
                try {
                    ZKUtil.createNodeIfNotExistsNoWatch(this.clientZkWatcher, node, data, CreateMode.PERSISTENT);
                    break;
                }
                catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException ee) {
                    this.reconnectAfterExpiration();
                }
                catch (KeeperException e) {
                    LOG.warn("Failed to create znode " + node + " due to: " + e.getMessage() + ", will retry later");
                }
            }
            catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException ee) {
                this.reconnectAfterExpiration();
            }
            catch (KeeperException e) {
                LOG.debug("Failed to set data to client ZK, will retry later", (Throwable)e);
            }
            Threads.sleep(200L);
        }
    }

    private final void reconnectAfterExpiration() throws InterruptedException {
        LOG.warn("ZK session expired or lost. Retry a new connection...");
        try {
            this.clientZkWatcher.reconnectAfterExpiration();
        }
        catch (IOException | KeeperException e) {
            LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", (Throwable)e);
        }
    }

    @Override
    public void nodeCreated(String path) {
        if (!this.validate(path)) {
            return;
        }
        try {
            byte[] data = ZKUtil.getDataAndWatch(this.watcher, path);
            this.upsertQueue(path, data);
        }
        catch (KeeperException e) {
            LOG.warn("Unexpected exception handling nodeCreated event", (Throwable)e);
        }
    }

    @Override
    public void nodeDataChanged(String path) {
        if (this.validate(path)) {
            this.nodeCreated(path);
        }
    }

    @Override
    public synchronized void nodeDeleted(String path) {
        if (this.validate(path)) {
            try {
                if (ZKUtil.watchAndCheckExists(this.watcher, path)) {
                    this.nodeCreated(path);
                }
            }
            catch (KeeperException e) {
                LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, (Throwable)e);
            }
        }
    }

    abstract boolean validate(String var1);

    abstract Collection<String> getNodesToWatch();

    class ClientZkUpdater
    extends Thread {
        final String znode;
        final BlockingQueue<byte[]> queue;

        public ClientZkUpdater(String znode, BlockingQueue<byte[]> queue) {
            this.znode = znode;
            this.queue = queue;
            this.setName("ClientZKUpdater-" + znode);
        }

        @Override
        public void run() {
            while (!ClientZKSyncer.this.server.isStopped()) {
                try {
                    byte[] data = this.queue.take();
                    ClientZKSyncer.this.setDataForClientZkUntilSuccess(this.znode, data);
                }
                catch (InterruptedException e) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Interrupted while checking whether need to update meta location to client zk");
                    }
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

