/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.opentelemetry.module.kafka.consumer;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.semconv.SemanticAttributes;
import jakarta.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import ru.tinkoff.kora.kafka.common.consumer.telemetry.KafkaConsumerTracer;
import ru.tinkoff.kora.opentelemetry.common.OpentelemetryContext;

public final class OpentelemetryKafkaConsumerTracer
implements KafkaConsumerTracer {
    private final Tracer tracer;

    public OpentelemetryKafkaConsumerTracer(Tracer tracer) {
        this.tracer = tracer;
    }

    public KafkaConsumerTracer.KafkaConsumerRecordsSpan get(ConsumerRecords<?, ?> records) {
        ru.tinkoff.kora.common.Context ctx = ru.tinkoff.kora.common.Context.current();
        OpentelemetryContext otctx = OpentelemetryContext.get((ru.tinkoff.kora.common.Context)ctx);
        Set partitions = records.partitions();
        HashMap<TopicPartition, Span> spans = new HashMap<TopicPartition, Span>(partitions.size());
        Span rootSpan = this.tracer.spanBuilder("kafka.poll").setSpanKind(SpanKind.CONSUMER).setAttribute(SemanticAttributes.MESSAGING_SYSTEM, (Object)"kafka").setNoParent().startSpan();
        OpentelemetryContext rootCtx = otctx.add((ImplicitContextKeyed)rootSpan);
        for (TopicPartition topicPartition : partitions) {
            Span partitionSpan = this.tracer.spanBuilder(topicPartition.topic() + " receive").setParent(rootCtx.getContext()).setSpanKind(SpanKind.CONSUMER).setAttribute(SemanticAttributes.MESSAGING_SYSTEM, (Object)"kafka").setAttribute(SemanticAttributes.MESSAGING_OPERATION, (Object)"receive").setAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, (Object)topicPartition.topic()).setAttribute(SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, (Object)topicPartition.partition()).setAttribute(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, (Object)records.count()).startSpan();
            spans.put(topicPartition, partitionSpan);
        }
        OpentelemetryContext.set((ru.tinkoff.kora.common.Context)ctx, (OpentelemetryContext)rootCtx);
        return new OpentelemetryKafkaConsumerRecordsSpan(this.tracer, rootCtx, rootSpan, spans);
    }

    private static final class OpentelemetryKafkaConsumerRecordsSpan
    implements KafkaConsumerTracer.KafkaConsumerRecordsSpan {
        private final Tracer tracer;
        private final OpentelemetryContext rootCtx;
        private final Span rootSpan;
        private final Map<TopicPartition, Span> spans;

        public OpentelemetryKafkaConsumerRecordsSpan(Tracer tracer, OpentelemetryContext rootCtx, Span rootSpan, Map<TopicPartition, Span> spans) {
            this.tracer = tracer;
            this.rootCtx = rootCtx;
            this.rootSpan = rootSpan;
            this.spans = spans;
        }

        public KafkaConsumerTracer.KafkaConsumerRecordSpan get(ConsumerRecord<?, ?> record) {
            Span partitionSpan = this.spans.get(new TopicPartition(record.topic(), record.partition()));
            Context root = Context.root();
            Context parent = W3CTraceContextPropagator.getInstance().extract(root, record, (TextMapGetter)ConsumerRecordTextMapGetter.INSTANCE);
            SpanBuilder recordSpanBuilder = this.tracer.spanBuilder(record.topic() + " process").setSpanKind(SpanKind.CONSUMER).setParent(parent).addLink(partitionSpan.getSpanContext()).setAttribute(SemanticAttributes.MESSAGING_SYSTEM, (Object)"kafka").setAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, (Object)record.topic()).setAttribute(SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, (Object)record.partition()).setAttribute(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, (Object)record.offset());
            try {
                recordSpanBuilder.setAttribute(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, (Object)Objects.toString(record.key()));
            }
            catch (Exception exception) {
                // empty catch block
            }
            Span recordSpan = recordSpanBuilder.startSpan();
            OpentelemetryContext.set((ru.tinkoff.kora.common.Context)ru.tinkoff.kora.common.Context.current(), (OpentelemetryContext)this.rootCtx.add((ImplicitContextKeyed)recordSpan));
            return new OpentelemetryKafkaConsumerRecordSpan(this.rootCtx, recordSpan);
        }

        public void close(@Nullable Throwable ex) {
            for (Span span : this.spans.values()) {
                span.end();
            }
            this.rootSpan.end();
        }

        private static enum ConsumerRecordTextMapGetter implements TextMapGetter<ConsumerRecord<?, ?>>
        {
            INSTANCE;


            public Iterable<String> keys(ConsumerRecord<?, ?> carrier) {
                HashSet<String> set = new HashSet<String>();
                for (Header header : carrier.headers()) {
                    set.add(header.key());
                }
                return set;
            }

            @Nullable
            public String get(ConsumerRecord<?, ?> carrier, String key) {
                Header header = carrier.headers().lastHeader(key);
                return header != null ? new String(header.value(), StandardCharsets.UTF_8) : null;
            }
        }
    }

    private static final class OpentelemetryKafkaConsumerRecordSpan
    implements KafkaConsumerTracer.KafkaConsumerRecordSpan {
        private final OpentelemetryContext rootCtx;
        private final Span recordSpan;

        public OpentelemetryKafkaConsumerRecordSpan(OpentelemetryContext rootCtx, Span recordSpan) {
            this.rootCtx = rootCtx;
            this.recordSpan = recordSpan;
        }

        public void close(@Nullable Throwable ex) {
            this.recordSpan.end();
            OpentelemetryContext.set((ru.tinkoff.kora.common.Context)ru.tinkoff.kora.common.Context.current(), (OpentelemetryContext)this.rootCtx);
        }
    }
}

