/*
 * Decompiled with CFR 0.152.
 */
package com.github.phantomthief.zookeeper.broadcast;

import com.github.phantomthief.zookeeper.broadcast.Broadcaster;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkBroadcaster
implements Broadcaster {
    private static final String DEFAULT_ZK_PREFIX = "/broadcast";
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Supplier<CuratorFramework> curatorFactory;
    private final String zkPrefix;
    private final ConcurrentMap<String, Set<Broadcaster.Subscriber>> subscribeMap = new ConcurrentHashMap<String, Set<Broadcaster.Subscriber>>();
    private final ConcurrentMap<String, NodeCache> nodeCacheMap = new ConcurrentHashMap<String, NodeCache>();

    public ZkBroadcaster(Supplier<CuratorFramework> curatorFactory, String zkPrefix) {
        this.curatorFactory = curatorFactory;
        this.zkPrefix = zkPrefix;
    }

    public ZkBroadcaster(Supplier<CuratorFramework> curatorFactory) {
        this(curatorFactory, DEFAULT_ZK_PREFIX);
    }

    @Override
    public void subscribe(@Nonnull String path, @Nonnull Broadcaster.Subscriber subscriber) {
        Preconditions.checkNotNull((Object)path);
        Preconditions.checkNotNull((Object)subscriber);
        Set subscribers = this.subscribeMap.compute(path, (k, oldSet) -> {
            if (oldSet == null) {
                oldSet = new HashSet<Broadcaster.Subscriber>();
            }
            oldSet.add(subscriber);
            return oldSet;
        });
        this.nodeCacheMap.computeIfAbsent(path, p -> {
            CuratorFramework curatorFramework = this.curatorFactory.get();
            NodeCache nodeCache = new NodeCache(curatorFramework, ZKPaths.makePath((String)this.zkPrefix, (String)p));
            try {
                nodeCache.start();
                nodeCache.rebuild();
            }
            catch (Throwable e) {
                Throwables.throwIfUnchecked((Throwable)e);
                throw new RuntimeException(e);
            }
            nodeCache.getListenable().addListener(() -> {
                ChildData currentData = nodeCache.getCurrentData();
                String content = currentData != null && currentData.getData() != null ? new String(currentData.getData(), StandardCharsets.UTF_8) : "";
                subscribers.parallelStream().forEach(s -> {
                    try {
                        s.onChanged(content);
                    }
                    catch (Throwable e) {
                        this.logger.error("Ops. fail to do handle for:{}->{}", new Object[]{this.zkPrefix, s, e});
                    }
                });
            });
            return nodeCache;
        });
    }

    @Override
    public void broadcast(String path, String content) {
        String realPath = ZKPaths.makePath((String)this.zkPrefix, (String)path);
        try {
            try {
                this.curatorFactory.get().setData().forPath(realPath, content.getBytes(StandardCharsets.UTF_8));
            }
            catch (KeeperException.NoNodeException e) {
                this.curatorFactory.get().create().creatingParentsIfNeeded().forPath(realPath, content.getBytes(StandardCharsets.UTF_8));
            }
        }
        catch (Throwable e) {
            Throwables.throwIfUnchecked((Throwable)e);
            throw new RuntimeException(e);
        }
    }
}

