/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.discover;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKRegistrationClient
implements RegistrationClient {
    private static final Logger log = LoggerFactory.getLogger(ZKRegistrationClient.class);
    private static final int ZK_CONNECT_BACKOFF_MS = 200;
    private ClientConfiguration conf;
    private ZooKeeper zk = null;
    private boolean ownZKHandle = false;
    private ScheduledExecutorService scheduler;
    private WatchTask watchWritableBookiesTask = null;
    private WatchTask watchReadOnlyBookiesTask = null;
    private String bookieRegistrationPath;
    private String bookieReadonlyRegistrationPath;

    @Override
    public RegistrationClient initialize(ClientConfiguration conf, ScheduledExecutorService scheduler, StatsLogger statsLogger, Optional<ZooKeeper> zkOptional) throws BKException {
        this.conf = conf;
        this.scheduler = scheduler;
        this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + "readonly";
        if (zkOptional.isPresent()) {
            this.zk = zkOptional.get();
            this.ownZKHandle = false;
        } else {
            try {
                this.zk = ZooKeeperClient.newBuilder().connectString(conf.getZkServers()).sessionTimeoutMs(conf.getZkTimeout()).operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(), conf.getZkTimeout(), 0)).statsLogger(statsLogger).build();
                if (null == this.zk.exists(this.bookieReadonlyRegistrationPath, false)) {
                    try {
                        List<ACL> zkAcls = ZkUtils.getACLs(conf);
                        this.zk.create(this.bookieReadonlyRegistrationPath, new byte[0], zkAcls, CreateMode.PERSISTENT);
                    }
                    catch (KeeperException.NodeExistsException zkAcls) {}
                }
            }
            catch (IOException | KeeperException e) {
                log.error("Failed to create zookeeper client to {}", (Object)conf.getZkServers(), (Object)e);
                BKException.ZKException zke = new BKException.ZKException();
                zke.fillInStackTrace();
                throw zke;
            }
            catch (InterruptedException e) {
                throw new BKException.BKInterruptedException();
            }
            this.ownZKHandle = true;
        }
        return this;
    }

    @Override
    public void close() {
        if (this.ownZKHandle && null != this.zk) {
            try {
                this.zk.close();
            }
            catch (InterruptedException e) {
                log.warn("Interrupted on closing zookeeper client", (Throwable)e);
            }
        }
    }

    public ZooKeeper getZk() {
        return this.zk;
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieSocketAddress>>> getWritableBookies() {
        return this.getChildren(this.bookieRegistrationPath, null);
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieSocketAddress>>> getReadOnlyBookies() {
        return this.getChildren(this.bookieReadonlyRegistrationPath, null);
    }

    private CompletableFuture<Versioned<Set<BookieSocketAddress>>> getChildren(String regPath, Watcher watcher) {
        CompletableFuture future = FutureUtils.createFuture();
        this.zk.getChildren(regPath, watcher, (rc, path, ctx, children, stat) -> {
            if (0 != rc) {
                BKException.ZKException zke = new BKException.ZKException();
                zke.fillInStackTrace();
                future.completeExceptionally(zke);
                return;
            }
            LongVersion version = new LongVersion(stat.getVersion());
            HashSet<BookieSocketAddress> bookies = ZKRegistrationClient.convertToBookieAddresses(children);
            future.complete(new Versioned<HashSet<BookieSocketAddress>>(bookies, version));
        }, null);
        return future;
    }

    @Override
    public synchronized CompletableFuture<Void> watchWritableBookies(RegistrationClient.RegistrationListener listener) {
        CompletableFuture<Void> f = new CompletableFuture<Void>();
        if (null == this.watchWritableBookiesTask) {
            this.watchWritableBookiesTask = new WatchTask(this.bookieRegistrationPath, f);
        }
        this.watchWritableBookiesTask.addListener(listener);
        if (this.watchWritableBookiesTask.getNumListeners() == 1) {
            this.watchWritableBookiesTask.watch();
        }
        return f;
    }

    @Override
    public synchronized void unwatchWritableBookies(RegistrationClient.RegistrationListener listener) {
        if (null == this.watchWritableBookiesTask) {
            return;
        }
        this.watchWritableBookiesTask.removeListener(listener);
        if (this.watchWritableBookiesTask.getNumListeners() == 0) {
            this.watchWritableBookiesTask.close();
            this.watchWritableBookiesTask = null;
        }
    }

    @Override
    public synchronized CompletableFuture<Void> watchReadOnlyBookies(RegistrationClient.RegistrationListener listener) {
        CompletableFuture<Void> f = new CompletableFuture<Void>();
        if (null == this.watchReadOnlyBookiesTask) {
            this.watchReadOnlyBookiesTask = new WatchTask(this.bookieReadonlyRegistrationPath, f);
        }
        this.watchReadOnlyBookiesTask.addListener(listener);
        if (this.watchReadOnlyBookiesTask.getNumListeners() == 1) {
            this.watchReadOnlyBookiesTask.watch();
        }
        return f;
    }

    @Override
    public synchronized void unwatchReadOnlyBookies(RegistrationClient.RegistrationListener listener) {
        if (null == this.watchReadOnlyBookiesTask) {
            return;
        }
        this.watchReadOnlyBookiesTask.removeListener(listener);
        if (this.watchReadOnlyBookiesTask.getNumListeners() == 0) {
            this.watchReadOnlyBookiesTask.close();
            this.watchReadOnlyBookiesTask = null;
        }
    }

    private static HashSet<BookieSocketAddress> convertToBookieAddresses(List<String> children) {
        HashSet newBookieAddrs = Sets.newHashSet();
        for (String bookieAddrString : children) {
            BookieSocketAddress bookieAddr;
            if ("readonly".equals(bookieAddrString)) continue;
            try {
                bookieAddr = new BookieSocketAddress(bookieAddrString);
            }
            catch (IOException e) {
                log.error("Could not parse bookie address: " + bookieAddrString + ", ignoring this bookie");
                continue;
            }
            newBookieAddrs.add(bookieAddr);
        }
        return newBookieAddrs;
    }

    private class WatchTask
    implements SafeRunnable,
    Watcher,
    BiConsumer<Versioned<Set<BookieSocketAddress>>, Throwable>,
    AutoCloseable {
        private final String regPath;
        private final Set<RegistrationClient.RegistrationListener> listeners;
        private boolean closed = false;
        private Set<BookieSocketAddress> bookies = null;
        private Version version = Version.NEW;
        private final CompletableFuture<Void> firstRunFuture;

        WatchTask(String regPath, CompletableFuture<Void> firstRunFuture) {
            this.regPath = regPath;
            this.listeners = new CopyOnWriteArraySet<RegistrationClient.RegistrationListener>();
            this.firstRunFuture = firstRunFuture;
        }

        public int getNumListeners() {
            return this.listeners.size();
        }

        public boolean addListener(RegistrationClient.RegistrationListener listener) {
            if (this.listeners.add(listener) && null != this.bookies) {
                ZKRegistrationClient.this.scheduler.execute(() -> listener.onBookiesChanged(new Versioned<Set<BookieSocketAddress>>(this.bookies, this.version)));
            }
            return true;
        }

        public boolean removeListener(RegistrationClient.RegistrationListener listener) {
            return this.listeners.remove(listener);
        }

        void watch() {
            this.scheduleWatchTask(0L);
        }

        private void scheduleWatchTask(long delayMs) {
            try {
                ZKRegistrationClient.this.scheduler.schedule((Runnable)((Object)this), delayMs, TimeUnit.MILLISECONDS);
            }
            catch (RejectedExecutionException ree) {
                log.warn("Failed to schedule watch bookies task", (Throwable)ree);
            }
        }

        public void safeRun() {
            if (this.isClosed()) {
                return;
            }
            ZKRegistrationClient.this.getChildren(this.regPath, this).whenCompleteAsync((BiConsumer)this, (Executor)ZKRegistrationClient.this.scheduler);
        }

        @Override
        public void accept(Versioned<Set<BookieSocketAddress>> bookieSet, Throwable throwable) {
            if (throwable != null) {
                this.scheduleWatchTask(200L);
                this.firstRunFuture.completeExceptionally(throwable);
                return;
            }
            if (this.version.compare(bookieSet.getVersion()) == Version.Occurred.BEFORE || this.version.compare(bookieSet.getVersion()) == Version.Occurred.CONCURRENTLY) {
                this.version = bookieSet.getVersion();
                this.bookies = bookieSet.getValue();
                for (RegistrationClient.RegistrationListener listener : this.listeners) {
                    listener.onBookiesChanged(bookieSet);
                }
            }
            FutureUtils.complete(this.firstRunFuture, null);
        }

        public void process(WatchedEvent event) {
            if (Watcher.Event.EventType.None == event.getType()) {
                if (Watcher.Event.KeeperState.Expired == event.getState()) {
                    this.scheduleWatchTask(200L);
                }
                return;
            }
            this.scheduleWatchTask(0L);
        }

        synchronized boolean isClosed() {
            return this.closed;
        }

        @Override
        public synchronized void close() {
            if (!this.closed) {
                return;
            }
            this.closed = true;
        }
    }
}

