/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.master.zksyncer;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hudi.org.apache.hadoop.hbase.Server;
import org.apache.hudi.org.apache.hadoop.hbase.util.Threads;
import org.apache.hudi.org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hudi.org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hudi.org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public abstract class ClientZKSyncer
extends ZKListener {
    private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class);
    private final Server server;
    private final ZKWatcher clientZkWatcher;
    private final ConcurrentMap<String, ZKData> queues;

    public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
        super(watcher);
        this.server = server;
        this.clientZkWatcher = clientZkWatcher;
        this.queues = new ConcurrentHashMap<String, ZKData>();
    }

    private void startNewSyncThread(String path) {
        ZKData zkData = new ZKData();
        this.queues.put(path, zkData);
        ClientZkUpdater updater = new ClientZkUpdater(path, zkData);
        updater.setDaemon(true);
        updater.start();
        this.watchAndCheckExists(path);
    }

    public void start() throws KeeperException {
        LOG.debug("Starting " + ((Object)((Object)this)).getClass().getSimpleName());
        this.watcher.registerListener((ZKListener)this);
        ZKUtil.createWithParents((ZKWatcher)this.clientZkWatcher, (String)this.watcher.getZNodePaths().baseZNode);
        Set<String> paths = this.getPathsToWatch();
        LOG.debug("ZNodes to watch: {}", paths);
        for (String path : paths) {
            this.startNewSyncThread(path);
        }
    }

    private void watchAndCheckExists(String node) {
        try {
            if (ZKUtil.watchAndCheckExists((ZKWatcher)this.watcher, (String)node)) {
                byte[] data2 = ZKUtil.getDataAndWatch((ZKWatcher)this.watcher, (String)node);
                if (data2 != null) {
                    this.upsertQueue(node, data2);
                } else {
                    LOG.debug("Found no data from " + node);
                    this.watchAndCheckExists(node);
                }
            } else {
                ZKUtil.deleteNodeFailSilent((ZKWatcher)this.clientZkWatcher, (String)node);
            }
        }
        catch (KeeperException e) {
            this.server.abort("Unexpected exception during initialization, aborting", e);
        }
    }

    private void upsertQueue(String node, byte[] data2) {
        ZKData zkData = (ZKData)this.queues.get(node);
        if (zkData != null) {
            zkData.set(data2);
        }
    }

    private void setDataForClientZkUntilSuccess(String node, byte[] data2) throws InterruptedException {
        boolean create2 = false;
        while (!this.server.isStopped()) {
            try {
                LOG.debug("Set data for remote " + node + ", client zk wather: " + this.clientZkWatcher);
                if (create2) {
                    ZKUtil.createNodeIfNotExistsNoWatch((ZKWatcher)this.clientZkWatcher, (String)node, (byte[])data2, (CreateMode)CreateMode.PERSISTENT);
                    break;
                }
                ZKUtil.setData((ZKWatcher)this.clientZkWatcher, (String)node, (byte[])data2);
                break;
            }
            catch (KeeperException e) {
                LOG.debug("Failed to set data for {} to client ZK, will retry later", (Object)node, (Object)e);
                if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
                    this.reconnectAfterExpiration();
                }
                if (e.code() == KeeperException.Code.NONODE) {
                    create2 = true;
                }
                if (e.code() == KeeperException.Code.NODEEXISTS) {
                    create2 = false;
                }
                Threads.sleep(200L);
            }
        }
    }

    private void deleteDataForClientZkUntilSuccess(String node) throws InterruptedException {
        while (!this.server.isStopped()) {
            LOG.debug("Delete remote " + node + ", client zk wather: " + this.clientZkWatcher);
            try {
                ZKUtil.deleteNode((ZKWatcher)this.clientZkWatcher, (String)node);
            }
            catch (KeeperException e) {
                LOG.debug("Failed to delete node from client ZK, will retry later", (Throwable)e);
                if (e.code() != KeeperException.Code.SESSIONEXPIRED) continue;
                this.reconnectAfterExpiration();
            }
        }
    }

    private final void reconnectAfterExpiration() throws InterruptedException {
        LOG.warn("ZK session expired or lost. Retry a new connection...");
        try {
            this.clientZkWatcher.reconnectAfterExpiration();
        }
        catch (IOException | KeeperException e) {
            LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e);
        }
    }

    private void getDataAndWatch(String path) {
        try {
            byte[] data2 = ZKUtil.getDataAndWatch((ZKWatcher)this.watcher, (String)path);
            this.upsertQueue(path, data2);
        }
        catch (KeeperException e) {
            LOG.warn("Unexpected exception handling nodeCreated event", (Throwable)e);
        }
    }

    private void removeQueue(String path) {
        ZKData zkData = (ZKData)this.queues.remove(path);
        if (zkData != null) {
            zkData.delete();
        }
    }

    public void nodeCreated(String path) {
        if (this.validate(path)) {
            this.getDataAndWatch(path);
        } else {
            this.removeQueue(path);
        }
    }

    public void nodeDataChanged(String path) {
        this.nodeCreated(path);
    }

    public synchronized void nodeDeleted(String path) {
        if (this.validate(path)) {
            try {
                if (ZKUtil.watchAndCheckExists((ZKWatcher)this.watcher, (String)path)) {
                    this.getDataAndWatch(path);
                }
            }
            catch (KeeperException e) {
                LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, (Throwable)e);
            }
        } else {
            this.removeQueue(path);
        }
    }

    protected abstract boolean validate(String var1);

    protected abstract Set<String> getPathsToWatch();

    protected final void refreshWatchingList() {
        Set<String> newPaths = this.getPathsToWatch();
        LOG.debug("New ZNodes to watch: {}", newPaths);
        Iterator iter = this.queues.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry entry = iter.next();
            if (newPaths.contains(entry.getKey())) continue;
            iter.remove();
            ((ZKData)entry.getValue()).delete();
        }
        for (String newPath : newPaths) {
            if (this.queues.containsKey(newPath)) continue;
            this.startNewSyncThread(newPath);
        }
    }

    private final class ClientZkUpdater
    extends Thread {
        private final String znode;
        private final ZKData zkData;

        public ClientZkUpdater(String znode, ZKData zkData) {
            this.znode = znode;
            this.zkData = zkData;
            this.setName("ClientZKUpdater-" + znode);
        }

        @Override
        public void run() {
            LOG.debug("Client zk updater for znode {} started", (Object)this.znode);
            while (!ClientZKSyncer.this.server.isStopped()) {
                try {
                    byte[] data2 = this.zkData.get();
                    if (data2 != null) {
                        ClientZKSyncer.this.setDataForClientZkUntilSuccess(this.znode, data2);
                        continue;
                    }
                    if (!this.zkData.isDeleted()) continue;
                    ClientZKSyncer.this.deleteDataForClientZkUntilSuccess(this.znode);
                    break;
                }
                catch (InterruptedException e) {
                    LOG.debug("Interrupted while checking whether need to update meta location to client zk");
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            LOG.debug("Client zk updater for znode {} stopped", (Object)this.znode);
        }
    }

    private static final class ZKData {
        byte[] data;
        boolean delete = false;

        private ZKData() {
        }

        synchronized void set(byte[] data2) {
            this.data = data2;
            this.notifyAll();
        }

        synchronized byte[] get() throws InterruptedException {
            while (!this.delete && this.data == null) {
                this.wait();
            }
            byte[] d = this.data;
            this.data = null;
            return d;
        }

        synchronized void delete() {
            this.delete = true;
            this.notifyAll();
        }

        synchronized boolean isDeleted() {
            return this.delete;
        }
    }
}

