/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.instrumentation.kafkaclients;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetryBuilder;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;

public final class KafkaTelemetry {
    private static final Logger logger = Logger.getLogger(KafkaTelemetry.class.getName());
    private static final TextMapGetter<ConsumerRecord<?, ?>> GETTER = KafkaConsumerRecordGetter.INSTANCE;
    private static final TextMapSetter<Headers> SETTER = KafkaHeadersSetter.INSTANCE;
    private final OpenTelemetry openTelemetry;
    private final Instrumenter<ProducerRecord<?, ?>, RecordMetadata> producerInstrumenter;
    private final Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter;
    private final boolean producerPropagationEnabled;

    KafkaTelemetry(OpenTelemetry openTelemetry, Instrumenter<ProducerRecord<?, ?>, RecordMetadata> producerInstrumenter, Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter, boolean producerPropagationEnabled) {
        this.openTelemetry = openTelemetry;
        this.producerInstrumenter = producerInstrumenter;
        this.consumerProcessInstrumenter = consumerProcessInstrumenter;
        this.producerPropagationEnabled = producerPropagationEnabled;
    }

    public static KafkaTelemetry create(OpenTelemetry openTelemetry) {
        return KafkaTelemetry.builder(openTelemetry).build();
    }

    public static KafkaTelemetryBuilder builder(OpenTelemetry openTelemetry) {
        return new KafkaTelemetryBuilder(openTelemetry);
    }

    private TextMapPropagator propagator() {
        return this.openTelemetry.getPropagators().getTextMapPropagator();
    }

    public <K, V> Producer<K, V> wrap(Producer<K, V> producer) {
        return (Producer)Proxy.newProxyInstance(KafkaTelemetry.class.getClassLoader(), new Class[]{Producer.class}, (proxy, method, args) -> {
            if ("send".equals(method.getName()) && method.getParameterCount() >= 1 && method.getParameterTypes()[0] == ProducerRecord.class) {
                ProducerRecord record = (ProducerRecord)args[0];
                Callback callback = method.getParameterCount() >= 2 && method.getParameterTypes()[1] == Callback.class ? (Callback)args[1] : null;
                return this.buildAndInjectSpan(record, callback, (arg_0, arg_1) -> ((Producer)producer).send(arg_0, arg_1));
            }
            return method.invoke((Object)producer, args);
        });
    }

    public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
        return (Consumer)Proxy.newProxyInstance(KafkaTelemetry.class.getClassLoader(), new Class[]{Consumer.class}, (proxy, method, args) -> {
            Object result = method.invoke((Object)consumer, args);
            if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) {
                this.buildAndFinishSpan((ConsumerRecords)result);
            }
            return result;
        });
    }

    public Map<String, ?> metricConfigProperties() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("metric.reporters", OpenTelemetryMetricsReporter.class.getName());
        config.put("opentelemetry.instance", (String)this.openTelemetry);
        config.put("opentelemetry.instrumentation_name", "io.opentelemetry.kafka-clients-2.6");
        return Collections.unmodifiableMap(config);
    }

    <K, V> void buildAndInjectSpan(ProducerRecord<K, V> record) {
        Context parentContext = Context.current();
        if (!this.producerInstrumenter.shouldStart(parentContext, record)) {
            return;
        }
        Context context = this.producerInstrumenter.start(parentContext, record);
        if (this.producerPropagationEnabled) {
            try {
                this.propagator().inject(context, (Object)record.headers(), SETTER);
            }
            catch (Throwable t) {
                logger.log(Level.WARNING, "failed to inject span context. sending record second time?", t);
            }
        }
        this.producerInstrumenter.end(context, record, null, null);
    }

    <K, V> Future<RecordMetadata> buildAndInjectSpan(ProducerRecord<K, V> record, Callback callback, BiFunction<ProducerRecord<K, V>, Callback, Future<RecordMetadata>> sendFn) {
        Context parentContext = Context.current();
        if (!this.producerInstrumenter.shouldStart(parentContext, record)) {
            return sendFn.apply(record, callback);
        }
        Context context = this.producerInstrumenter.start(parentContext, record);
        try (Scope ignored = context.makeCurrent();){
            this.propagator().inject(context, (Object)record.headers(), SETTER);
            callback = new ProducerCallback(callback, parentContext, context, record);
            Future<RecordMetadata> future = sendFn.apply(record, callback);
            return future;
        }
    }

    <K, V> void buildAndFinishSpan(ConsumerRecords<K, V> records) {
        Context currentContext = Context.current();
        for (ConsumerRecord record : records) {
            Context linkedContext = this.propagator().extract(currentContext, (Object)record, GETTER);
            Context newContext = currentContext.with((ImplicitContextKeyed)Span.fromContext((Context)linkedContext));
            if (!this.consumerProcessInstrumenter.shouldStart(newContext, (Object)record)) continue;
            Context current = this.consumerProcessInstrumenter.start(newContext, (Object)record);
            this.consumerProcessInstrumenter.end(current, (Object)record, null, null);
        }
    }

    private class ProducerCallback
    implements Callback {
        private final Callback callback;
        private final Context parentContext;
        private final Context context;
        private final ProducerRecord<?, ?> request;

        public ProducerCallback(Callback callback, Context parentContext, Context context, ProducerRecord<?, ?> request) {
            this.callback = callback;
            this.parentContext = parentContext;
            this.context = context;
            this.request = request;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            KafkaTelemetry.this.producerInstrumenter.end(this.context, this.request, (Object)metadata, (Throwable)exception);
            if (this.callback != null) {
                try (Scope ignored = this.parentContext.makeCurrent();){
                    this.callback.onCompletion(metadata, exception);
                }
            }
        }
    }
}

