/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.stats.prometheus;

import io.netty.util.concurrent.FastThreadLocal;
import java.util.HashMap;
import java.util.Map;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.AggregatedTransactionCoordinatorStats;
import org.apache.pulsar.broker.stats.prometheus.ManagedLedgerStats;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionAggregator {
    private static final Logger log = LoggerFactory.getLogger(TransactionAggregator.class);
    private static final FastThreadLocal<Map<String, String>> threadLocalMetricWithTypeDefinition = new FastThreadLocal(){

        protected Map<String, String> initialValue() {
            return new HashMap<String, String>();
        }
    };
    private static final FastThreadLocal<AggregatedTransactionCoordinatorStats> localTransactionCoordinatorStats = new FastThreadLocal<AggregatedTransactionCoordinatorStats>(){

        protected AggregatedTransactionCoordinatorStats initialValue() throws Exception {
            return new AggregatedTransactionCoordinatorStats();
        }
    };
    private static final FastThreadLocal<ManagedLedgerStats> localManageLedgerStats = new FastThreadLocal<ManagedLedgerStats>(){

        protected ManagedLedgerStats initialValue() throws Exception {
            return new ManagedLedgerStats();
        }
    };

    public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, boolean includeTopicMetrics) {
        String cluster = pulsar.getConfiguration().getClusterName();
        Map metricWithTypeDefinition = (Map)threadLocalMetricWithTypeDefinition.get();
        metricWithTypeDefinition.clear();
        if (includeTopicMetrics) {
            pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
                if (topic instanceof PersistentTopic) {
                    topic.getSubscriptions().values().forEach(subscription -> {
                        try {
                            ((ManagedLedgerStats)localManageLedgerStats.get()).reset();
                            if (!SystemTopicNames.isEventSystemTopic((TopicName)TopicName.get((String)subscription.getTopic().getName())) && subscription instanceof PersistentSubscription && ((PersistentSubscription)subscription).checkIfPendingAckStoreInit()) {
                                ManagedLedger managedLedger = ((PersistentSubscription)subscription).getPendingAckManageLedger().get();
                                TransactionAggregator.generateManageLedgerStats(managedLedger, stream, cluster, namespace, name, subscription.getName());
                            }
                        }
                        catch (Exception e) {
                            log.warn("Transaction pending ack generate managedLedgerStats fail!", (Throwable)e);
                        }
                    });
                }
            })));
        }
        AggregatedTransactionCoordinatorStats transactionCoordinatorStats = (AggregatedTransactionCoordinatorStats)localTransactionCoordinatorStats.get();
        pulsar.getTransactionMetadataStoreService().getStores().forEach((transactionCoordinatorID, transactionMetadataStore) -> {
            transactionCoordinatorStats.reset();
            TransactionMetadataStoreStats transactionMetadataStoreStats = transactionMetadataStore.getMetadataStoreStats();
            transactionCoordinatorStats.actives = transactionMetadataStoreStats.getActives();
            transactionCoordinatorStats.committedCount = transactionMetadataStoreStats.getCommittedCount();
            transactionCoordinatorStats.abortedCount = transactionMetadataStoreStats.getAbortedCount();
            transactionCoordinatorStats.createdCount = transactionMetadataStoreStats.getCreatedCount();
            transactionCoordinatorStats.timeoutCount = transactionMetadataStoreStats.getTimeoutCount();
            transactionCoordinatorStats.appendLogCount = transactionMetadataStoreStats.getAppendLogCount();
            transactionMetadataStoreStats.executionLatencyBuckets.refresh();
            transactionCoordinatorStats.executionLatency = transactionMetadataStoreStats.executionLatencyBuckets.getBuckets();
            TransactionAggregator.printTransactionCoordinatorStats(stream, cluster, transactionCoordinatorStats, transactionMetadataStoreStats.getCoordinatorId());
            ((ManagedLedgerStats)localManageLedgerStats.get()).reset();
            if (transactionMetadataStore instanceof MLTransactionMetadataStore) {
                ManagedLedger managedLedger = ((MLTransactionMetadataStore)transactionMetadataStore).getManagedLedger();
                TransactionAggregator.generateManageLedgerStats(managedLedger, stream, cluster, NamespaceName.SYSTEM_NAMESPACE.toString(), "__transaction_log_" + transactionCoordinatorID.getId(), "transaction.subscription");
            }
        });
    }

    private static void generateManageLedgerStats(ManagedLedger managedLedger, SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription) {
        ManagedLedgerStats managedLedgerStats = (ManagedLedgerStats)localManageLedgerStats.get();
        ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl)managedLedger.getStats();
        managedLedgerStats.storageSize = mlStats.getStoredMessagesSize();
        managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
        managedLedgerStats.backlogSize = managedLedger.getEstimatedBacklogSize();
        managedLedgerStats.offloadedStorageUsed = managedLedger.getOffloadedSize();
        managedLedgerStats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
        managedLedgerStats.storageWriteLatencyBuckets.refresh();
        managedLedgerStats.storageLedgerWriteLatencyBuckets.addAll(mlStats.getInternalLedgerAddEntryLatencyBuckets());
        managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
        managedLedgerStats.entrySizeBuckets.addAll(mlStats.getInternalEntrySizeBuckets());
        managedLedgerStats.entrySizeBuckets.refresh();
        managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
        managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
        TransactionAggregator.printManageLedgerStats(stream, cluster, namespace, topic, subscription, managedLedgerStats);
    }

    private static void metricType(SimpleTextOutputStream stream, String name) {
        Map metricWithTypeDefinition = (Map)threadLocalMetricWithTypeDefinition.get();
        if (!metricWithTypeDefinition.containsKey(name)) {
            metricWithTypeDefinition.put(name, "gauge");
            stream.write("# TYPE ").write(name).write(" gauge\n");
        }
    }

    private static void metric(SimpleTextOutputStream stream, String cluster, String name, double value, long coordinatorId) {
        TransactionAggregator.metricType(stream, name);
        stream.write(name).write("{cluster=\"").write(cluster).write("\",coordinator_id=\"").write(coordinatorId).write("\"} ").write(value).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String name, long value) {
        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String name, double value) {
        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    private static void printManageLedgerStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, ManagedLedgerStats stats) {
        TransactionAggregator.metrics(stream, cluster, namespace, topic, subscription, "pulsar_storage_size", stats.storageSize);
        TransactionAggregator.metrics(stream, cluster, namespace, topic, subscription, "pulsar_storage_logical_size", stats.storageLogicalSize);
        TransactionAggregator.metrics(stream, cluster, namespace, topic, subscription, "pulsar_storage_backlog_size", stats.backlogSize);
        TransactionAggregator.metrics(stream, cluster, namespace, topic, subscription, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
        TransactionAggregator.metrics(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_rate", stats.storageWriteRate);
        TransactionAggregator.metrics(stream, cluster, namespace, topic, subscription, "pulsar_storage_read_rate", stats.storageReadRate);
        stats.storageWriteLatencyBuckets.refresh();
        long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_count", stats.storageWriteLatencyBuckets.getCount());
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_sum", stats.storageWriteLatencyBuckets.getSum());
        stats.storageLedgerWriteLatencyBuckets.refresh();
        long[] ledgerWritelatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_overflow", ledgerWritelatencyBuckets[9]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_count", stats.storageLedgerWriteLatencyBuckets.getCount());
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_sum", stats.storageLedgerWriteLatencyBuckets.getSum());
        stats.entrySizeBuckets.refresh();
        long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
        TransactionAggregator.metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
    }

    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String name, long value) {
        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    static void printTransactionCoordinatorStats(SimpleTextOutputStream stream, String cluster, AggregatedTransactionCoordinatorStats stats, long coordinatorId) {
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_active_count", stats.actives, coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_committed_total", stats.committedCount, coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_aborted_total", stats.abortedCount, coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_created_total", stats.createdCount, coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_timeout_total", stats.timeoutCount, coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_append_log_total", stats.appendLogCount, coordinatorId);
        long[] latencyBuckets = stats.executionLatency;
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_300000", latencyBuckets[10], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_1500000", latencyBuckets[11], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_3000000", latencyBuckets[12], coordinatorId);
        TransactionAggregator.metric(stream, cluster, "pulsar_txn_execution_latency_le_overflow", latencyBuckets[13], coordinatorId);
    }
}

