/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.impl.RuntimeKafkaSinkConfiguration;
import io.smallrye.reactive.messaging.kafka.tracing.HeaderInjectAdapter;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Message;

public class KafkaRecordHelper {
    public static Headers getHeaders(OutgoingKafkaRecordMetadata<?> om, IncomingKafkaRecordMetadata<?, ?> im, RuntimeKafkaSinkConfiguration configuration) {
        RecordHeaders headers = new RecordHeaders();
        if (KafkaRecordHelper.isNotBlank(configuration.getPropagateHeaders()) && im != null && im.getHeaders() != null) {
            Set headersToPropagate = Arrays.stream(configuration.getPropagateHeaders().split(",")).map(String::trim).collect(Collectors.toSet());
            for (Header header : im.getHeaders()) {
                if (!headersToPropagate.contains(header.key())) continue;
                headers.add(header);
            }
        }
        if (om != null && om.getHeaders() != null) {
            om.getHeaders().forEach(arg_0 -> ((Headers)headers).add(arg_0));
        }
        return headers;
    }

    public static boolean isNotBlank(String s) {
        return s != null && !s.trim().isEmpty();
    }

    public static void createOutgoingTrace(Message<?> message, String topic, Integer partition, Headers headers) {
        Optional tracingMetadata = TracingMetadata.fromMessage(message);
        SpanBuilder spanBuilder = KafkaConnector.TRACER.spanBuilder(topic + " send").setSpanKind(SpanKind.PRODUCER);
        if (tracingMetadata.isPresent()) {
            Context parentSpanContext = ((TracingMetadata)tracingMetadata.get()).getCurrentContext();
            if (parentSpanContext != null) {
                spanBuilder.setParent(parentSpanContext);
            } else {
                spanBuilder.setNoParent();
            }
        } else {
            spanBuilder.setNoParent();
        }
        Span span = spanBuilder.startSpan();
        Scope scope = span.makeCurrent();
        if (partition != null && partition != -1) {
            span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition.intValue());
        }
        span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, (Object)"kafka");
        span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, (Object)topic);
        span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, (Object)"topic");
        GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), (Object)headers, (TextMapSetter)HeaderInjectAdapter.SETTER);
        span.end();
        scope.close();
    }
}

