/*
 * Decompiled with CFR 0.152.
 */
package org.aoju.bus.cache.provider;

import jakarta.annotation.PreDestroy;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.aoju.bus.cache.Hitting;
import org.aoju.bus.cache.magic.CachePair;
import org.aoju.bus.logger.Logger;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.KeeperException;

public class ZookeeperHitting
implements Hitting {
    private static final ExecutorService executor = Executors.newSingleThreadExecutor(r -> {
        Thread thread = new Thread(r);
        thread.setName("cache:zk-uploader");
        thread.setDaemon(true);
        return thread;
    });
    private static final String NAME_SPACE = "cache";
    private volatile boolean isShutdown = false;
    private BlockingQueue<CachePair<String, Integer>> hitQueue = new LinkedTransferQueue<CachePair<String, Integer>>();
    private BlockingQueue<CachePair<String, Integer>> requireQueue = new LinkedTransferQueue<CachePair<String, Integer>>();
    private Map<String, DistributedAtomicLong> hitCounterMap = new HashMap<String, DistributedAtomicLong>();
    private Map<String, DistributedAtomicLong> requireCounterMap = new HashMap<String, DistributedAtomicLong>();
    private CuratorFramework client;
    private String hitPathPrefix;
    private String requirePathPrefix;

    public ZookeeperHitting(String zkServer) {
        this(zkServer, System.getProperty("product.name", "unnamed"));
    }

    public ZookeeperHitting(String zkServer, String productName) {
        this.client = CuratorFrameworkFactory.builder().connectString(zkServer).retryPolicy((RetryPolicy)new RetryNTimes(3, 0)).namespace(NAME_SPACE).build();
        this.client.start();
        String uniqueProductName = this.processProductName(productName);
        this.hitPathPrefix = String.format("%s%s", uniqueProductName, "hit");
        this.requirePathPrefix = String.format("%s%s", uniqueProductName, "require");
        try {
            this.client.create().creatingParentsIfNeeded().forPath(this.hitPathPrefix);
            this.client.create().creatingParentsIfNeeded().forPath(this.requirePathPrefix);
        }
        catch (KeeperException.NodeExistsException nodeExistsException) {
        }
        catch (Exception e) {
            throw new RuntimeException("create path: " + this.hitPathPrefix + ", " + this.requirePathPrefix + " on namespace: cache error", e);
        }
        executor.submit(() -> {
            while (!this.isShutdown) {
                this.dumpToZK(this.hitQueue, this.hitCounterMap, this.hitPathPrefix);
                this.dumpToZK(this.requireQueue, this.requireCounterMap, this.requirePathPrefix);
            }
        });
    }

    @Override
    public void hitIncr(String pattern, int count) {
        if (count != 0) {
            this.hitQueue.add(CachePair.of(pattern, count));
        }
    }

    @Override
    public void reqIncr(String pattern, int count) {
        if (count != 0) {
            this.requireQueue.add(CachePair.of(pattern, count));
        }
    }

    @Override
    public Map<String, Hitting.HittingDO> getHitting() {
        LinkedHashMap<String, Hitting.HittingDO> result = new LinkedHashMap<String, Hitting.HittingDO>();
        AtomicLong totalHit = new AtomicLong(0L);
        AtomicLong totalRequire = new AtomicLong(0L);
        this.requireCounterMap.forEach((key, requireCounter) -> {
            try {
                long require = this.getValue(requireCounter.get());
                long hit = this.getValue(this.hitCounterMap.get(key));
                totalRequire.addAndGet(require);
                totalHit.addAndGet(hit);
                result.put((String)key, Hitting.HittingDO.newInstance(hit, require));
            }
            catch (Exception e) {
                Logger.error(e, "acquire hit count error: ", e.getMessage());
            }
        });
        result.put(this.summaryName(), Hitting.HittingDO.newInstance(totalHit.get(), totalRequire.get()));
        return result;
    }

    @Override
    public void reset(String pattern) {
        this.hitCounterMap.computeIfPresent(pattern, this::doReset);
        this.requireCounterMap.computeIfPresent(pattern, this::doReset);
    }

    @Override
    public void resetAll() {
        this.hitCounterMap.forEach(this::doReset);
        this.requireCounterMap.forEach(this::doReset);
    }

    @PreDestroy
    public void tearDown() {
        while (this.hitQueue.size() > 0 || this.requireQueue.size() > 0) {
            try {
                TimeUnit.SECONDS.sleep(1L);
            }
            catch (InterruptedException interruptedException) {}
        }
        this.isShutdown = true;
    }

    private String processProductName(String productName) {
        if (!((String)productName).startsWith("/")) {
            productName = "/" + (String)productName;
        }
        if (!((String)productName).endsWith("/")) {
            productName = (String)productName + "/";
        }
        return productName;
    }

    private DistributedAtomicLong doReset(String pattern, DistributedAtomicLong counter) {
        try {
            counter.forceSet(Long.valueOf(0L));
        }
        catch (Exception e) {
            Logger.error(e, "reset distribute counter error: ", e.getMessage());
        }
        return null;
    }

    private void dumpToZK(BlockingQueue<CachePair<String, Integer>> queue, Map<String, DistributedAtomicLong> counterMap, String zkPrefix) {
        CachePair head;
        HashMap<String, AtomicLong> holdMap = new HashMap<String, AtomicLong>();
        for (long count = 0L; null != (head = (CachePair)queue.poll()) && count <= 100L; ++count) {
            holdMap.computeIfAbsent((String)head.getLeft(), key -> new AtomicLong(0L)).addAndGet(((Integer)head.getRight()).intValue());
        }
        holdMap.forEach((pattern, atomicCount) -> {
            String zkPath = String.format("%s/%s", zkPrefix, pattern);
            DistributedAtomicLong counter = counterMap.computeIfAbsent((String)pattern, key -> new DistributedAtomicLong(this.client, zkPath, (RetryPolicy)new RetryNTimes(10, 10)));
            try {
                counter.add(Long.valueOf(atomicCount.get())).postValue();
            }
            catch (Exception e) {
                Logger.error(e, "dump data from queue to zookeeper error: ", e.getMessage());
            }
        });
    }

    private long getValue(Object value) throws Exception {
        long result = 0L;
        if (null != value) {
            result = value instanceof DistributedAtomicLong ? this.getValue(((DistributedAtomicLong)value).get()) : (value instanceof AtomicValue ? ((Long)((AtomicValue)value).postValue()).longValue() : ((AtomicLong)value).get());
        }
        return result;
    }
}

