/*
 * Decompiled with CFR 0.152.
 */
package co.elastic.apm.agent.kafka.helper;

import co.elastic.apm.agent.configuration.MessagingConfiguration;
import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.kafka.helper.CallbackWrapper;
import co.elastic.apm.agent.kafka.helper.KafkaInstrumentationHelper;
import co.elastic.apm.agent.matcher.WildcardMatcher;
import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.shaded.jctools.queues.atomic.AtomicQueueFactory;
import co.elastic.apm.agent.shaded.jctools.queues.spec.ConcurrentQueueSpec;
import co.elastic.apm.agent.shaded.slf4j.Logger;
import co.elastic.apm.agent.shaded.slf4j.LoggerFactory;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;

public class KafkaInstrumentationHelperImpl
implements KafkaInstrumentationHelper<Callback, ProducerRecord, KafkaProducer> {
    public static final Logger logger = LoggerFactory.getLogger(KafkaInstrumentationHelperImpl.class);
    private final ObjectPool<CallbackWrapper> callbackWrapperObjectPool;
    private final ElasticApmTracer tracer;
    private MessagingConfiguration messagingConfiguration;

    public KafkaInstrumentationHelperImpl(ElasticApmTracer tracer) {
        this.tracer = tracer;
        this.messagingConfiguration = tracer.getConfig(MessagingConfiguration.class);
        this.callbackWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.newQueue(ConcurrentQueueSpec.createBoundedMpmc(256)), false, new CallbackWrapperAllocator());
    }

    private boolean ignoreTopic(String topicName) {
        return WildcardMatcher.isAnyMatch(this.messagingConfiguration.getIgnoreMessageQueues(), topicName);
    }

    @Override
    @Nullable
    public Span onSendStart(ProducerRecord record) {
        String topic = record.topic();
        if (this.ignoreTopic(topic)) {
            return null;
        }
        AbstractSpan<?> activeSpan = this.tracer.getActive();
        if (activeSpan == null) {
            return null;
        }
        Span span = activeSpan.createExitSpan();
        if (span == null) {
            return null;
        }
        span.withType("messaging").withSubtype("kafka").withAction("send");
        ((Span)span.withName("KafkaProducer#send to ")).appendToName(topic);
        span.getContext().getMessage().withQueue(topic);
        span.getContext().getDestination().getService().withType("messaging").withName("kafka").getResource().append("kafka/").append(topic);
        span.activate();
        return span;
    }

    @Override
    public Callback wrapCallback(@Nullable Callback callback, Span span) {
        if (callback instanceof CallbackWrapper) {
            return callback;
        }
        try {
            return this.callbackWrapperObjectPool.createInstance().wrap(callback, span);
        }
        catch (Throwable throwable) {
            logger.debug("Failed to wrap Kafka send callback", throwable);
            return callback;
        }
    }

    void recycle(CallbackWrapper callbackWrapper) {
        this.callbackWrapperObjectPool.recycle(callbackWrapper);
    }

    @Override
    public void onSendEnd(Span span, ProducerRecord producerRecord, KafkaProducer kafkaProducer, @Nullable Throwable throwable) {
        if (this.messagingConfiguration.shouldCollectQueueAddress()) {
            try {
                Node leader;
                List partitions = kafkaProducer.partitionsFor(producerRecord.topic());
                Integer partition = producerRecord.partition();
                PartitionInfo partitionInfo = null;
                if (partition != null) {
                    partitionInfo = (PartitionInfo)partitions.get(partition);
                } else if (!partitions.isEmpty()) {
                    partitionInfo = (PartitionInfo)partitions.get(0);
                }
                if (partitionInfo != null && (leader = partitionInfo.leader()) != null) {
                    span.getContext().getDestination().withAddress(leader.host()).withPort(leader.port());
                }
            }
            catch (Exception e) {
                logger.error("Failed to get Kafka producer's destination", e);
            }
        }
        span.captureException(throwable);
        span.deactivate();
    }

    private final class CallbackWrapperAllocator
    implements Allocator<CallbackWrapper> {
        private CallbackWrapperAllocator() {
        }

        @Override
        public CallbackWrapper createInstance() {
            return new CallbackWrapper(KafkaInstrumentationHelperImpl.this);
        }
    }
}

