/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.spi.cluster.zookeeper.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.spi.cluster.zookeeper.impl.ChoosableSet;
import io.vertx.spi.cluster.zookeeper.impl.ZKMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.ErrorListenerPathable;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;

public class ZKAsyncMultiMap<K, V>
extends ZKMap<K, V>
implements AsyncMultiMap<K, V> {
    private TreeCache treeCache;
    private CountDownLatch latch = new CountDownLatch(1);
    private ConcurrentMap<String, ChoosableSet<V>> cache = new ConcurrentHashMap<String, ChoosableSet<V>>();
    private ConcurrentMap<String, ChoosableSet<V>> eventBusSnapshotCache = new ConcurrentHashMap<String, ChoosableSet<V>>();
    private static final Logger logger = LoggerFactory.getLogger(ZKAsyncMultiMap.class);

    public ZKAsyncMultiMap(Vertx vertx, CuratorFramework curator, String mapName) {
        super(curator, vertx, "asyncMultiMap", mapName);
        this.treeCache = new TreeCache(curator, this.mapPath);
        this.treeCache.getListenable().addListener((Object)new Listener());
        try {
            this.treeCache.start();
            this.latch.await(10L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new VertxException((Throwable)e);
        }
    }

    public void add(K k, V v, Handler<AsyncResult<Void>> completionHandler) {
        String path = this.valuePath(k, v);
        this.assertKeyAndValueAreNotNull(k, v).compose(aVoid -> this.checkExists(path)).compose(checkResult -> checkResult != false ? this.setData(path, v) : this.create(path, v)).compose(stat -> {
            if (path.contains("/asyncMultiMap/__vertx.subs/")) {
                ChoosableSet<Object> serverIDs = (ChoosableSet<Object>)this.eventBusSnapshotCache.get(path);
                if (serverIDs == null) {
                    serverIDs = new ChoosableSet<Object>(1);
                }
                serverIDs.add(v);
                this.eventBusSnapshotCache.put(path, serverIDs);
            }
            Future future = Future.future();
            try {
                ((ErrorListenerPathable)this.curator.sync().inBackground((syncClient, syncEvent) -> {
                    if (syncEvent.getType() == CuratorEventType.SYNC) {
                        ((ErrorListenerPathable)this.curator.getData().inBackground((getClient, getEvent) -> {
                            if (stat == null || stat.getMtime() <= getEvent.getStat().getMtime()) {
                                this.vertx.runOnContext(aVoid -> future.complete());
                            } else {
                                this.vertx.runOnContext(aVoid -> future.fail("can not get correct zxid."));
                            }
                        })).forPath(path);
                    }
                })).forPath(path);
            }
            catch (Exception ex) {
                this.vertx.runOnContext(aVoid -> future.fail((Throwable)ex));
            }
            return future;
        }).setHandler(completionHandler);
    }

    public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> asyncResultHandler) {
        Context ctx = this.vertx.getOrCreateContext();
        this.assertKeyIsNotNull(k).compose(aVoid -> {
            String keyPath = this.keyPath(k);
            ChoosableSet entries = (ChoosableSet)this.cache.get(keyPath);
            Future future = Future.future();
            if (entries != null && !entries.isEmpty()) {
                future.complete((Object)entries);
            } else {
                try {
                    ((ErrorListenerPathable)this.curator.sync().inBackground((clientSync, eventSync) -> {
                        if (eventSync.getType() == CuratorEventType.SYNC) {
                            Map maps = this.treeCache.getCurrentChildren(keyPath);
                            ChoosableSet newEntries = new ChoosableSet(maps != null ? maps.size() : 0);
                            if (maps != null) {
                                for (ChildData childData : maps.values()) {
                                    try {
                                        if (childData == null || childData.getData() == null || childData.getData().length <= 0) continue;
                                        newEntries.add(this.asObject(childData.getData()));
                                    }
                                    catch (Exception ex) {
                                        ctx.runOnContext(v -> future.fail((Throwable)ex));
                                    }
                                }
                                this.cache.putIfAbsent(keyPath, newEntries);
                            }
                            ctx.runOnContext(v -> future.complete((Object)newEntries));
                        }
                    })).forPath(keyPath);
                }
                catch (Exception ex) {
                    ctx.runOnContext(v -> future.fail((Throwable)ex));
                }
            }
            return future;
        }).setHandler(ar -> ctx.runOnContext(v -> asyncResultHandler.handle(ar)));
    }

    public void remove(K k, V v, Handler<AsyncResult<Boolean>> completionHandler) {
        this.assertKeyAndValueAreNotNull(k, v).compose(aVoid -> {
            String fullPath = this.valuePath(k, v);
            return this.remove(this.keyPath(k), v, fullPath);
        }).setHandler(completionHandler);
    }

    private Future<Boolean> remove(String keyPath, V v, String fullPath) {
        return this.checkExists(fullPath).compose(checkResult -> {
            Future future = Future.future();
            if (checkResult.booleanValue()) {
                Optional.ofNullable(this.treeCache.getCurrentData(fullPath)).ifPresent(childData -> this.delete(fullPath, null).setHandler(deleteResult -> {
                    if (keyPath.contains("/asyncMultiMap/__vertx.subs/")) {
                        Optional.ofNullable(this.eventBusSnapshotCache.get(keyPath)).ifPresent(vs -> {
                            vs.remove(v);
                            this.eventBusSnapshotCache.put(keyPath, (ChoosableSet<ChoosableSet>)vs);
                        });
                    }
                    future.complete((Object)true);
                }));
            } else {
                future.complete((Object)false);
            }
            return future;
        });
    }

    public void removeAllForValue(V v, Handler<AsyncResult<Void>> completionHandler) {
        this.removeAllMatching(value -> value.hashCode() == v.hashCode(), completionHandler);
    }

    public void removeAllMatching(Predicate<V> p, Handler<AsyncResult<Void>> completionHandler) {
        ArrayList futures = new ArrayList();
        Optional.ofNullable(this.treeCache.getCurrentChildren(this.mapPath)).ifPresent(childDataMap -> {
            childDataMap.keySet().forEach(partKeyPath -> {
                String keyPath = this.mapPath + "/" + partKeyPath;
                this.treeCache.getCurrentChildren(keyPath).keySet().forEach(valuePath -> {
                    String fullPath = keyPath + "/" + valuePath;
                    Optional.ofNullable(this.treeCache.getCurrentData(fullPath)).filter(childData -> Optional.of(childData.getData()).isPresent()).ifPresent(childData -> {
                        try {
                            Object value = this.asObject(childData.getData());
                            if (p.test(value)) {
                                futures.add(this.remove(keyPath, value, fullPath));
                            }
                        }
                        catch (Exception e) {
                            futures.add(Future.failedFuture((Throwable)e));
                        }
                    });
                });
            });
            CompositeFuture.all((List)futures).compose(compositeFuture -> {
                Future future = Future.future();
                future.complete();
                return future;
            }).setHandler(completionHandler);
        });
    }

    private Future<Void> restoreSnapshotCache() {
        Future futureResult = Future.future();
        List allFuture = this.eventBusSnapshotCache.entrySet().stream().map(entry -> {
            String path = ((String)entry.getKey()).substring(this.mapPath.length() + 1).split("/", 2)[0];
            ChoosableSet values = (ChoosableSet)entry.getValue();
            List futures = values.getIds().stream().map(value -> {
                Future future = Future.future();
                this.add(path, value, (Handler<AsyncResult<Void>>)future);
                return future;
            }).collect(Collectors.toList());
            return futures;
        }).flatMap(Collection::stream).collect(Collectors.toList());
        CompositeFuture.all(allFuture).setHandler(event -> {
            if (event.failed()) {
                futureResult.fail(event.cause());
            } else {
                futureResult.complete();
            }
        });
        return futureResult;
    }

    private class Listener
    implements TreeCacheListener {
        private AtomicBoolean reconnected = new AtomicBoolean(false);

        private Listener() {
        }

        private String cachePath(String key) {
            return ZKAsyncMultiMap.this.mapPath + "/" + key;
        }

        public void childEvent(CuratorFramework client, TreeCacheEvent treeCacheEvent) throws Exception {
            if (treeCacheEvent.getType() == TreeCacheEvent.Type.INITIALIZED) {
                ZKAsyncMultiMap.this.latch.countDown();
                return;
            }
            ChildData childData = treeCacheEvent.getData();
            String[] key = null;
            ChoosableSet entries = null;
            if (treeCacheEvent.getType() == TreeCacheEvent.Type.NODE_ADDED || treeCacheEvent.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
                if (childData == null || ZKAsyncMultiMap.this.mapPath.length() == childData.getPath().length()) {
                    return;
                }
                key = childData.getPath().substring(ZKAsyncMultiMap.this.mapPath.length() + 1).split("/", 2);
                entries = ZKAsyncMultiMap.this.cache.computeIfAbsent(this.cachePath(key[0]), k -> new ChoosableSet(1));
            }
            switch (treeCacheEvent.getType()) {
                case NODE_ADDED: {
                    if (key.length <= 1) break;
                    entries.add(ZKAsyncMultiMap.this.asObject(childData.getData()));
                    break;
                }
                case NODE_REMOVED: {
                    if (key.length == 1) {
                        ZKAsyncMultiMap.this.cache.remove(this.cachePath((String)key[0]));
                    } else {
                        Iterator iterator = entries.iterator();
                        while (iterator.hasNext()) {
                            Object entry = iterator.next();
                            if (!entry.toString().equals(key[1])) continue;
                            entries.remove(entry);
                        }
                    }
                    if (!this.reconnected.get()) break;
                    this.reconnected.set(false);
                    ZKAsyncMultiMap.this.restoreSnapshotCache().setHandler(event -> {
                        if (event.failed()) {
                            logger.error((Object)"restore eventbus snapshot cache failed.", event.cause());
                        } else {
                            logger.info((Object)"restore eventbus snapshot cache success.");
                        }
                    });
                    break;
                }
                case CONNECTION_SUSPENDED: {
                    logger.warn((Object)"connection to the zookeeper server have suspended.");
                    break;
                }
                case CONNECTION_RECONNECTED: {
                    this.reconnected.set(true);
                    break;
                }
                case CONNECTION_LOST: {
                    logger.error((Object)"connection to the zookeeper server have lost, all the temporary node will be remove.");
                }
            }
        }
    }
}

