/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.server.util;

import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.server.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestPathMetricsCollector {
    private static final Logger LOG = LoggerFactory.getLogger(RequestPathMetricsCollector.class);
    private final int REQUEST_STATS_SLOT_DURATION;
    private final int REQUEST_STATS_SLOT_CAPACITY;
    private final int REQUEST_PREPROCESS_PATH_DEPTH;
    private final float REQUEST_PREPROCESS_SAMPLE_RATE;
    private final long COLLECTOR_INITIAL_DELAY;
    private final long COLLECTOR_DELAY;
    private final int REQUEST_PREPROCESS_TOPPATH_MAX;
    private final boolean enabled;
    public static final String PATH_STATS_SLOT_CAPACITY = "zookeeper.pathStats.slotCapacity";
    public static final String PATH_STATS_SLOT_DURATION = "zookeeper.pathStats.slotDuration";
    public static final String PATH_STATS_MAX_DEPTH = "zookeeper.pathStats.maxDepth";
    public static final String PATH_STATS_SAMPLE_RATE = "zookeeper.pathStats.sampleRate";
    public static final String PATH_STATS_COLLECTOR_INITIAL_DELAY = "zookeeper.pathStats.initialDelay";
    public static final String PATH_STATS_COLLECTOR_DELAY = "zookeeper.pathStats.delay";
    public static final String PATH_STATS_TOP_PATH_MAX = "zookeeper.pathStats.topPathMax";
    public static final String PATH_STATS_ENABLED = "zookeeper.pathStats.enabled";
    private static final String PATH_SEPERATOR = "/";
    private final Map<String, PathStatsQueue> immutableRequestsMap;
    private final ScheduledThreadPoolExecutor scheduledExecutor;
    private final boolean accurateMode;

    public RequestPathMetricsCollector() {
        this(false);
    }

    public RequestPathMetricsCollector(boolean accurateMode) {
        HashMap<String, PathStatsQueue> requestsMap = new HashMap<String, PathStatsQueue>();
        this.accurateMode = accurateMode;
        this.REQUEST_PREPROCESS_TOPPATH_MAX = Integer.getInteger(PATH_STATS_TOP_PATH_MAX, 20);
        this.REQUEST_STATS_SLOT_DURATION = Integer.getInteger(PATH_STATS_SLOT_DURATION, 15);
        this.REQUEST_STATS_SLOT_CAPACITY = Integer.getInteger(PATH_STATS_SLOT_CAPACITY, 60);
        this.REQUEST_PREPROCESS_PATH_DEPTH = Integer.getInteger(PATH_STATS_MAX_DEPTH, 6);
        this.REQUEST_PREPROCESS_SAMPLE_RATE = Float.parseFloat(System.getProperty(PATH_STATS_SAMPLE_RATE, "0.1"));
        this.COLLECTOR_INITIAL_DELAY = Long.getLong(PATH_STATS_COLLECTOR_INITIAL_DELAY, 5L);
        this.COLLECTOR_DELAY = Long.getLong(PATH_STATS_COLLECTOR_DELAY, 5L);
        this.enabled = Boolean.getBoolean(PATH_STATS_ENABLED);
        LOG.info("{} = {}", (Object)PATH_STATS_SLOT_CAPACITY, (Object)this.REQUEST_STATS_SLOT_CAPACITY);
        LOG.info("{} = {}", (Object)PATH_STATS_SLOT_DURATION, (Object)this.REQUEST_STATS_SLOT_DURATION);
        LOG.info("{} = {}", (Object)PATH_STATS_MAX_DEPTH, (Object)this.REQUEST_PREPROCESS_PATH_DEPTH);
        LOG.info("{} = {}", (Object)PATH_STATS_COLLECTOR_INITIAL_DELAY, (Object)this.COLLECTOR_INITIAL_DELAY);
        LOG.info("{} = {}", (Object)PATH_STATS_COLLECTOR_DELAY, (Object)this.COLLECTOR_DELAY);
        LOG.info("{} = {}", (Object)PATH_STATS_ENABLED, (Object)this.enabled);
        this.scheduledExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        requestsMap.put(Request.op2String(1), new PathStatsQueue(1));
        requestsMap.put(Request.op2String(15), new PathStatsQueue(15));
        requestsMap.put(Request.op2String(19), new PathStatsQueue(19));
        requestsMap.put(Request.op2String(20), new PathStatsQueue(20));
        requestsMap.put(Request.op2String(2), new PathStatsQueue(2));
        requestsMap.put(Request.op2String(3), new PathStatsQueue(3));
        requestsMap.put(Request.op2String(5), new PathStatsQueue(5));
        requestsMap.put(Request.op2String(4), new PathStatsQueue(4));
        requestsMap.put(Request.op2String(6), new PathStatsQueue(6));
        requestsMap.put(Request.op2String(7), new PathStatsQueue(7));
        requestsMap.put(Request.op2String(8), new PathStatsQueue(8));
        requestsMap.put(Request.op2String(12), new PathStatsQueue(12));
        requestsMap.put(Request.op2String(17), new PathStatsQueue(17));
        requestsMap.put(Request.op2String(18), new PathStatsQueue(18));
        requestsMap.put(Request.op2String(105), new PathStatsQueue(105));
        requestsMap.put(Request.op2String(9), new PathStatsQueue(9));
        this.immutableRequestsMap = Collections.unmodifiableMap(requestsMap);
    }

    static boolean isWriteOp(int requestType) {
        switch (requestType) {
            case 1: 
            case 2: 
            case 5: 
            case 7: 
            case 9: 
            case 13: 
            case 14: 
            case 15: 
            case 16: 
            case 19: 
            case 20: {
                return true;
            }
        }
        return false;
    }

    static String trimPathDepth(String path, int maxDepth) {
        int count = 0;
        StringBuilder sb = new StringBuilder();
        StringTokenizer pathTokenizer = new StringTokenizer(path, PATH_SEPERATOR);
        while (pathTokenizer.hasMoreElements() && count++ < maxDepth) {
            sb.append(PATH_SEPERATOR);
            sb.append(pathTokenizer.nextToken());
        }
        path = sb.toString();
        return path;
    }

    public void shutdown() {
        if (!this.enabled) {
            return;
        }
        LOG.info("shutdown scheduledExecutor");
        this.scheduledExecutor.shutdownNow();
    }

    public void start() {
        if (!this.enabled) {
            return;
        }
        LOG.info("Start the RequestPath collector");
        this.immutableRequestsMap.forEach((opType, pathStatsQueue) -> pathStatsQueue.start());
        this.scheduledExecutor.scheduleWithFixedDelay(() -> {
            LOG.info("%nHere are the top Read paths:");
            this.logTopPaths(this.aggregatePaths(4, queue -> !queue.isWriteOperation()), entry -> LOG.info("{} : {}", entry.getKey(), entry.getValue()));
            LOG.info("%nHere are the top Write paths:");
            this.logTopPaths(this.aggregatePaths(4, queue -> queue.isWriteOperation()), entry -> LOG.info("{} : {}", entry.getKey(), entry.getValue()));
        }, this.COLLECTOR_INITIAL_DELAY, this.COLLECTOR_DELAY, TimeUnit.MINUTES);
    }

    public void registerRequest(int type, String path) {
        if (!this.enabled) {
            return;
        }
        if (ThreadLocalRandom.current().nextFloat() <= this.REQUEST_PREPROCESS_SAMPLE_RATE) {
            PathStatsQueue pathStatsQueue = this.immutableRequestsMap.get(Request.op2String(type));
            if (pathStatsQueue != null) {
                pathStatsQueue.registerRequest(path);
            } else {
                LOG.error("We should not handle {}", (Object)type);
            }
        }
    }

    public void dumpTopRequestPath(PrintWriter pwriter, String requestTypeName, int queryMaxDepth) {
        if (queryMaxDepth < 1) {
            return;
        }
        PathStatsQueue pathStatsQueue = this.immutableRequestsMap.get(requestTypeName);
        if (pathStatsQueue == null) {
            pwriter.println("Can not find path stats for type: " + requestTypeName);
            return;
        }
        pwriter.println("The top requests of type: " + requestTypeName);
        int maxDepth = Math.min(queryMaxDepth, this.REQUEST_PREPROCESS_PATH_DEPTH);
        Map<String, Integer> combinedMap = pathStatsQueue.collectStats(maxDepth);
        this.logTopPaths(combinedMap, entry -> pwriter.println((String)entry.getKey() + " : " + entry.getValue()));
    }

    public void dumpTopReadPaths(PrintWriter pwriter, int queryMaxDepth) {
        pwriter.println("The top read requests are");
        this.dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> !((PathStatsQueue)queue).isWriteOperation);
    }

    public void dumpTopWritePaths(PrintWriter pwriter, int queryMaxDepth) {
        pwriter.println("The top write requests are");
        this.dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> ((PathStatsQueue)queue).isWriteOperation);
    }

    public void dumpTopPaths(PrintWriter pwriter, int queryMaxDepth) {
        pwriter.println("The top requests are");
        this.dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> true);
    }

    private void dumpTopAggregatedPaths(PrintWriter pwriter, int queryMaxDepth, Predicate<PathStatsQueue> predicate) {
        if (!this.enabled) {
            return;
        }
        Map<String, Integer> combinedMap = this.aggregatePaths(queryMaxDepth, predicate);
        this.logTopPaths(combinedMap, entry -> pwriter.println((String)entry.getKey() + " : " + entry.getValue()));
    }

    Map<String, Integer> aggregatePaths(int queryMaxDepth, Predicate<PathStatsQueue> predicate) {
        HashMap<String, Integer> combinedMap = new HashMap<String, Integer>(this.REQUEST_PREPROCESS_TOPPATH_MAX);
        int maxDepth = Math.min(queryMaxDepth, this.REQUEST_PREPROCESS_PATH_DEPTH);
        this.immutableRequestsMap.values().stream().filter(predicate).forEach(pathStatsQueue -> pathStatsQueue.collectStats(maxDepth).forEach((path, count) -> combinedMap.put((String)path, combinedMap.getOrDefault(path, 0) + count)));
        return combinedMap;
    }

    void logTopPaths(Map<String, Integer> combinedMap, Consumer<Map.Entry<String, Integer>> output) {
        combinedMap.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue).reversed()).limit(this.REQUEST_PREPROCESS_TOPPATH_MAX).forEach(output);
    }

    class PathStatsQueue {
        private final String requestTypeName;
        private final AtomicReference<ConcurrentLinkedQueue<String>> currentSlot;
        private final LinkedBlockingQueue<Map<String, Integer>> requestPathStats;
        private final boolean isWriteOperation;

        public PathStatsQueue(int requestType) {
            this.requestTypeName = Request.op2String(requestType);
            this.isWriteOperation = RequestPathMetricsCollector.isWriteOp(requestType);
            this.requestPathStats = new LinkedBlockingQueue(RequestPathMetricsCollector.this.REQUEST_STATS_SLOT_CAPACITY);
            this.currentSlot = new AtomicReference(new ConcurrentLinkedQueue());
        }

        public void registerRequest(String path) {
            if (!RequestPathMetricsCollector.this.enabled) {
                return;
            }
            this.currentSlot.get().offer(path);
        }

        ConcurrentLinkedQueue<String> getCurrentSlot() {
            return this.currentSlot.get();
        }

        Map<String, Integer> mapReducePaths(int maxDepth, Collection<String> tobeProcessedSlot) {
            ConcurrentHashMap<String, Integer> newSlot = new ConcurrentHashMap<String, Integer>();
            tobeProcessedSlot.stream().filter(path -> path != null).forEach(path -> {
                path = RequestPathMetricsCollector.trimPathDepth(path, maxDepth);
                newSlot.put((String)path, newSlot.getOrDefault(path, 0) + 1);
            });
            return newSlot;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Map<String, Integer> collectStats(int maxDepth) {
            Map<String, Integer> combinedMap;
            Map<String, Integer> snapShot = this.mapReducePaths(maxDepth, Arrays.asList(this.currentSlot.get().toArray(new String[0])));
            Object object = RequestPathMetricsCollector.this.accurateMode ? this.requestPathStats : new Object();
            synchronized (object) {
                combinedMap = this.requestPathStats.stream().reduce(snapShot, (firstMap, secondMap) -> {
                    secondMap.forEach((key, value) -> {
                        String trimmedPath = RequestPathMetricsCollector.trimPathDepth(key, maxDepth);
                        firstMap.put(trimmedPath, firstMap.getOrDefault(trimmedPath, 0) + value);
                    });
                    return firstMap;
                });
            }
            return combinedMap;
        }

        public void start() {
            if (!RequestPathMetricsCollector.this.enabled) {
                return;
            }
            int delay = ThreadLocalRandom.current().nextInt(RequestPathMetricsCollector.this.REQUEST_STATS_SLOT_DURATION);
            RequestPathMetricsCollector.this.scheduledExecutor.scheduleWithFixedDelay(() -> {
                ConcurrentLinkedQueue<String> tobeProcessedSlot = this.currentSlot.getAndSet(new ConcurrentLinkedQueue());
                try {
                    Map<String, Integer> latestSlot = this.mapReducePaths(RequestPathMetricsCollector.this.REQUEST_PREPROCESS_PATH_DEPTH, tobeProcessedSlot);
                    Object object = RequestPathMetricsCollector.this.accurateMode ? this.requestPathStats : new Object();
                    synchronized (object) {
                        if (this.requestPathStats.remainingCapacity() <= 0) {
                            this.requestPathStats.poll();
                        }
                        if (!this.requestPathStats.offer(latestSlot)) {
                            LOG.error("Failed to insert the new request path stats for {}", (Object)this.requestTypeName);
                        }
                    }
                }
                catch (Exception e) {
                    LOG.error("Failed to insert the new request path stats for {} with exception {}", (Object)this.requestTypeName, (Object)e);
                }
            }, delay, RequestPathMetricsCollector.this.REQUEST_STATS_SLOT_DURATION, TimeUnit.SECONDS);
        }

        boolean isWriteOperation() {
            return this.isWriteOperation;
        }
    }
}

