/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.micrometer.module.kafka.consumer;

import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.semconv.SemanticAttributes;
import jakarta.annotation.Nullable;
import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.kafka.common.consumer.telemetry.KafkaConsumerMetrics;
import ru.tinkoff.kora.telemetry.common.TelemetryConfig;

public final class Opentelemetry123KafkaConsumerMetrics
implements KafkaConsumerMetrics,
Lifecycle {
    private static final AttributeKey<String> MESSAGING_KAFKA_CONSUMER_NAME = AttributeKey.stringKey((String)"messaging.kafka.consumer.name");
    private final MeterRegistry meterRegistry;
    private final ConcurrentHashMap<DurationKey, DistributionSummary> metrics = new ConcurrentHashMap();
    private final ConcurrentHashMap<DurationBatchKey, DistributionSummary> metricsBatch = new ConcurrentHashMap();
    private final ConcurrentHashMap<TopicPartition, LagGauge> lagMetrics = new ConcurrentHashMap();
    private final TelemetryConfig.MetricsConfig config;
    private final Properties driverProperties;

    public Opentelemetry123KafkaConsumerMetrics(MeterRegistry meterRegistry, Properties driverProperties, TelemetryConfig.MetricsConfig config) {
        this.meterRegistry = meterRegistry;
        this.config = config;
        this.driverProperties = driverProperties;
    }

    private DistributionSummary metrics(DurationKey key) {
        Object clientId = this.driverProperties.get("client.id");
        Object groupId = this.driverProperties.get("group.id");
        DistributionSummary.Builder builder = DistributionSummary.builder((String)"messaging.receive.duration").serviceLevelObjectives(this.config.slo(TelemetryConfig.MetricsConfig.OpentelemetrySpec.V123)).baseUnit("s").tag(SemanticAttributes.MESSAGING_SYSTEM.getKey(), "kafka").tag(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), key.topic()).tag(SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION.getKey(), String.valueOf(key.partition())).tag(SemanticAttributes.MESSAGING_CLIENT_ID.getKey(), Objects.requireNonNullElse(clientId, "").toString()).tag(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP.getKey(), Objects.requireNonNullElse(groupId, "").toString());
        if (key.errorType() != null) {
            builder.tag(SemanticAttributes.ERROR_TYPE.getKey(), key.errorType().getCanonicalName());
        } else {
            builder.tag(SemanticAttributes.ERROR_TYPE.getKey(), "");
        }
        return builder.register(this.meterRegistry);
    }

    private DistributionSummary metricBatch(DurationBatchKey key) {
        Object clientId = this.driverProperties.get("client.id");
        Object groupId = this.driverProperties.get("group.id");
        DistributionSummary.Builder builder = DistributionSummary.builder((String)"messaging.process.batch.duration").serviceLevelObjectives(this.config.slo(TelemetryConfig.MetricsConfig.OpentelemetrySpec.V123)).baseUnit("s").tag(SemanticAttributes.MESSAGING_SYSTEM.getKey(), "kafka").tag(MESSAGING_KAFKA_CONSUMER_NAME.getKey(), key.consumerName()).tag(SemanticAttributes.MESSAGING_OPERATION.getKey(), key.consumerName()).tag(SemanticAttributes.MESSAGING_CLIENT_ID.getKey(), Objects.requireNonNullElse(clientId, "").toString()).tag(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP.getKey(), Objects.requireNonNullElse(groupId, "").toString());
        if (key.errorType() != null) {
            builder.tag(SemanticAttributes.ERROR_TYPE.getKey(), key.errorType().getCanonicalName());
        } else {
            builder.tag(SemanticAttributes.ERROR_TYPE.getKey(), "");
        }
        return builder.register(this.meterRegistry);
    }

    public void onRecordsReceived(ConsumerRecords<?, ?> records) {
    }

    public void onRecordProcessed(ConsumerRecord<?, ?> record, long duration, Throwable ex) {
        double durationDouble = (double)duration / 1.0E9;
        DurationKey key = new DurationKey(record.topic(), record.partition(), ex != null ? ex.getClass() : null);
        this.metrics.computeIfAbsent(key, this::metrics).record(durationDouble);
    }

    public void onRecordsProcessed(String consumerName, ConsumerRecords<?, ?> records, long duration, @Nullable Throwable ex) {
        double durationDouble = (double)duration / 1.0E9;
        DurationBatchKey key = new DurationBatchKey(consumerName, ex != null ? ex.getClass() : null);
        this.metricsBatch.computeIfAbsent(key, this::metricBatch).record(durationDouble);
    }

    public void reportLag(String consumerName, TopicPartition partition, long lag) {
        this.lagMetrics.computeIfAbsent((TopicPartition)partition, (Function<TopicPartition, LagGauge>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$reportLag$0(java.lang.String org.apache.kafka.common.TopicPartition ), (Lorg/apache/kafka/common/TopicPartition;)Lru/tinkoff/kora/micrometer/module/kafka/consumer/Opentelemetry123KafkaConsumerMetrics$LagGauge;)((Opentelemetry123KafkaConsumerMetrics)this, (String)consumerName)).offsetLag = lag;
    }

    public void init() {
    }

    public void release() {
        ArrayList<DistributionSummary> metrics = new ArrayList<DistributionSummary>(this.metrics.values());
        this.metrics.clear();
        for (DistributionSummary metric : metrics) {
            metric.close();
        }
        ArrayList<LagGauge> lagMetrics = new ArrayList<LagGauge>(this.lagMetrics.values());
        this.lagMetrics.clear();
        for (LagGauge lagMetric : lagMetrics) {
            lagMetric.gauge.close();
        }
    }

    private /* synthetic */ LagGauge lambda$reportLag$0(String consumerName, TopicPartition p) {
        return new LagGauge(consumerName, p, this.meterRegistry);
    }

    private record DurationKey(String topic, int partition, @Nullable Class<? extends Throwable> errorType) {
    }

    private record DurationBatchKey(String consumerName, @Nullable Class<? extends Throwable> errorType) {
    }

    private static class LagGauge {
        private final Gauge gauge;
        private volatile long offsetLag;

        private LagGauge(String consumerName, TopicPartition partition, MeterRegistry meterRegistry) {
            this.gauge = Gauge.builder((String)"messaging.kafka.consumer.lag", () -> this.offsetLag).tag(SemanticAttributes.MESSAGING_SYSTEM.getKey(), "kafka").tag(MESSAGING_KAFKA_CONSUMER_NAME.getKey(), consumerName).tag(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), partition.topic()).tag(SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION.getKey(), Objects.toString(partition.partition())).register(meterRegistry);
        }
    }
}

