/*
 * Decompiled with CFR 0.152.
 */
package com.sofa.alipay.tracer.plugins.kafkamq.producer;

import com.alipay.common.tracer.core.SofaTracer;
import com.alipay.common.tracer.core.configuration.SofaTracerConfiguration;
import com.alipay.common.tracer.core.context.trace.SofaTraceContext;
import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.registry.ExtendFormat;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.tracer.AbstractTracer;
import com.sofa.alipay.tracer.plugins.kafkamq.carrier.KafkaMqInjectCarrier;
import com.sofa.alipay.tracer.plugins.kafkamq.tracers.KafkaMQSendTracer;
import io.opentracing.SpanContext;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Headers;

public class SofaTracerKafkaProducer<K, V>
implements Producer<K, V> {
    private final String kafkaSendPostFix = "-kafka-send";
    private Producer<K, V> producer;
    private KafkaMQSendTracer kafkaMQSendTracer;

    public SofaTracerKafkaProducer(Producer<K, V> producer, KafkaMQSendTracer kafkaMQSendTracer) {
        this.producer = producer;
        this.kafkaMQSendTracer = kafkaMQSendTracer;
    }

    public SofaTracerKafkaProducer(Producer<K, V> producer) {
        this.producer = producer;
        this.kafkaMQSendTracer = KafkaMQSendTracer.getKafkaMQSendTracerSingleton();
    }

    public void initTransactions() {
        this.producer.initTransactions();
    }

    public void beginTransaction() throws ProducerFencedException {
        this.producer.beginTransaction();
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
        this.producer.sendOffsetsToTransaction(offsets, consumerGroupId);
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
        this.producer.sendOffsetsToTransaction(map, consumerGroupMetadata);
    }

    public void commitTransaction() throws ProducerFencedException {
        this.producer.commitTransaction();
    }

    public void abortTransaction() throws ProducerFencedException {
        this.producer.abortTransaction();
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return this.send(producerRecord, null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
        if (null == this.kafkaMQSendTracer) {
            this.kafkaMQSendTracer = KafkaMQSendTracer.getKafkaMQSendTracerSingleton();
        }
        SofaTracerSpan clientSpan = this.kafkaMQSendTracer.clientSend("mq-kafka-send");
        this.appendSpanTagsAndInject(producerRecord, clientSpan);
        return this.producer.send(producerRecord, (Callback)new SofaTracerCallback(callback, (AbstractTracer)this.kafkaMQSendTracer, clientSpan));
    }

    public void flush() {
        this.producer.flush();
    }

    public List<PartitionInfo> partitionsFor(String topic) {
        return this.producer.partitionsFor(topic);
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return this.producer.metrics();
    }

    public void close() {
        this.producer.close();
    }

    public void close(Duration duration) {
        this.producer.close(duration);
    }

    private void appendSpanTagsAndInject(ProducerRecord<K, V> producerRecord, SofaTracerSpan clientSpan) {
        this.appendSpanTags(producerRecord, clientSpan);
        this.injectCarrier(clientSpan, producerRecord.headers());
        this.pushParentTracerSpan2Context(clientSpan);
    }

    private void appendSpanTags(ProducerRecord<K, V> producerRecord, SofaTracerSpan clientSpan) {
        clientSpan.setTag("current.thread.name", Thread.currentThread().getName());
        clientSpan.setTag("local.app", SofaTracerConfiguration.getProperty((String)"spring.application.name"));
        clientSpan.setTag("kafka.topic", producerRecord.topic());
        clientSpan.setTag("kafka.partition", (Number)(producerRecord.partition() == null ? -1 : producerRecord.partition()));
    }

    private void injectCarrier(SofaTracerSpan tracerSpan, Headers properties) {
        SofaTracer sofaTracer = this.kafkaMQSendTracer.getSofaTracer();
        sofaTracer.inject((SpanContext)tracerSpan.getSofaTracerSpanContext(), ExtendFormat.Builtin.B3_TEXT_MAP, (Object)new KafkaMqInjectCarrier(properties));
    }

    private void pushParentTracerSpan2Context(SofaTracerSpan tracerSpan) {
        SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
        if (Objects.nonNull(tracerSpan.getParentSofaTracerSpan())) {
            sofaTraceContext.push(tracerSpan.getParentSofaTracerSpan());
        }
    }

    static final class SofaTracerCallback
    implements Callback {
        final Callback callback;
        final AbstractTracer kafkaSendTracer;
        private SofaTracerSpan tracerSpan;

        public SofaTracerCallback(Callback callback, AbstractTracer kafkaSendTracer) {
            this.callback = callback;
            this.kafkaSendTracer = kafkaSendTracer;
        }

        public SofaTracerCallback(Callback callback, AbstractTracer kafkaSendTracer, SofaTracerSpan tracerSpan) {
            this.tracerSpan = tracerSpan;
            this.callback = callback;
            this.kafkaSendTracer = kafkaSendTracer;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            boolean successFlag = true;
            if (Objects.nonNull(exception)) {
                successFlag = false;
            }
            if (Objects.nonNull(this.callback)) {
                this.callback.onCompletion(metadata, exception);
            }
            this.kafkaSendTracer.clientReceiveTagFinish(this.tracerSpan, successFlag ? "00" : "99");
        }
    }
}

