/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket.stats;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.websocket.ProducerHandler;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.stats.JvmMetrics;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProxyStats {
    private final WebSocketService service;
    private final JvmMetrics jvmMetrics;
    private ConcurrentOpenHashMap<String, ProxyNamespaceStats> topicStats;
    private List<Metrics> metricsCollection;
    private List<Metrics> tempMetricsCollection;
    private static final Logger log = LoggerFactory.getLogger(ProxyStats.class);

    public ProxyStats(WebSocketService service) {
        this.service = service;
        this.jvmMetrics = new JvmMetrics(service);
        this.topicStats = ConcurrentOpenHashMap.newBuilder().build();
        this.metricsCollection = Lists.newArrayList();
        this.tempMetricsCollection = Lists.newArrayList();
        service.getExecutor().scheduleAtFixedRate(Runnables.catchingAndLoggingThrowables(this::generate), 120L, 60L, TimeUnit.SECONDS);
    }

    public synchronized void generate() {
        if (log.isDebugEnabled()) {
            log.debug("Start generating proxy metrics");
        }
        this.topicStats.clear();
        this.service.getProducers().forEach((topic, handlers) -> {
            if (log.isDebugEnabled()) {
                log.debug("Collect stats from {} producer handlers for topic {}", (Object)handlers.size(), topic);
            }
            String namespaceName = TopicName.get(topic).getNamespace();
            ProxyNamespaceStats nsStat = this.topicStats.computeIfAbsent(namespaceName, ns -> new ProxyNamespaceStats());
            handlers.forEach(handler -> {
                nsStat.numberOfMsgPublished += handler.getAndResetNumMsgsSent();
                nsStat.numberOfBytesPublished += handler.getAndResetNumBytesSent();
                nsStat.numberOfPublishFailure += handler.getAndResetNumMsgsFailed();
                handler.getPublishLatencyStatsUSec().refresh();
                nsStat.publishMsgLatency.addAll(handler.getPublishLatencyStatsUSec());
            });
        });
        this.service.getConsumers().forEach((topic, handlers) -> {
            if (log.isDebugEnabled()) {
                log.debug("Collect stats from {} consumer handlers for topic {}", (Object)handlers.size(), topic);
            }
            String namespaceName = TopicName.get(topic).getNamespace();
            ProxyNamespaceStats nsStat = this.topicStats.computeIfAbsent(namespaceName, ns -> new ProxyNamespaceStats());
            handlers.forEach(handler -> {
                nsStat.numberOfMsgDelivered += handler.getAndResetNumMsgsAcked();
                nsStat.numberOfBytesDelivered += handler.getAndResetNumBytesDelivered();
                nsStat.numberOfMsgsAcked += handler.getAndResetNumMsgsAcked();
            });
        });
        this.tempMetricsCollection.clear();
        this.topicStats.forEach((namespace, stats) -> {
            if (log.isDebugEnabled()) {
                log.debug("Add ns-stats of namespace {} to metrics", namespace);
            }
            this.tempMetricsCollection.add(stats.add((String)namespace));
        });
        if (log.isDebugEnabled()) {
            log.debug("Add jvm-stats to metrics");
        }
        this.tempMetricsCollection.add(this.jvmMetrics.generate());
        List<Metrics> tempRef = this.metricsCollection;
        this.metricsCollection = this.tempMetricsCollection;
        this.tempMetricsCollection = tempRef;
        if (log.isDebugEnabled()) {
            log.debug("Complete generating proxy metrics");
        }
    }

    public List<Metrics> getMetrics() {
        return this.metricsCollection;
    }

    private static class ProxyNamespaceStats {
        public long numberOfMsgPublished;
        public long numberOfBytesPublished;
        public long numberOfPublishFailure;
        public StatsBuckets publishMsgLatency = new StatsBuckets(ProducerHandler.ENTRY_LATENCY_BUCKETS_USEC);
        public long numberOfMsgDelivered;
        public long numberOfBytesDelivered;
        public long numberOfMsgsAcked;

        public Metrics add(String namespace) {
            this.publishMsgLatency.refresh();
            long[] latencyBuckets = this.publishMsgLatency.getBuckets();
            HashMap<String, String> dimensionMap = Maps.newHashMap();
            dimensionMap.put("namespace", namespace);
            Metrics dMetrics = Metrics.create(dimensionMap);
            dMetrics.put("ns_msg_publish_rate", this.numberOfMsgPublished);
            dMetrics.put("ns_byte_publish_rate", this.numberOfBytesPublished);
            dMetrics.put("ns_msg_failure_rate", this.numberOfPublishFailure);
            dMetrics.put("ns_msg_deliver_rate", this.numberOfMsgDelivered);
            dMetrics.put("ns_byte_deliver_rate", this.numberOfBytesDelivered);
            dMetrics.put("ns_msg_ack_rate", this.numberOfMsgsAcked);
            for (int i = 0; i < latencyBuckets.length; ++i) {
                String latencyBucket = i >= ProducerHandler.ENTRY_LATENCY_BUCKETS_USEC.size() ? ProducerHandler.ENTRY_LATENCY_BUCKETS_USEC.get(ProducerHandler.ENTRY_LATENCY_BUCKETS_USEC.size() - 1) + "_higher" : Long.toString(ProducerHandler.ENTRY_LATENCY_BUCKETS_USEC.get(i));
                dMetrics.put("ns_msg_publish_latency_" + latencyBucket, latencyBuckets[i]);
            }
            return dMetrics;
        }
    }
}

