/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl;
import io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.LoggedRecord;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter;
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
import jakarta.inject.Inject;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.Generated;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Decorator
@Priority(value=100)
public class TracingDecorator<KIn, VIn, KOut, VOut>
implements Processor<KIn, VIn, KOut, VOut> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TracingDecorator.class);
    private final Processor<KIn, VIn, KOut, VOut> delegate;
    private final OpenTelemetry openTelemetry;
    private final KafkaTextMapGetter textMapGetter;
    private final KafkaTextMapSetter textMapSetter;
    private final Tracer tracer;
    private final String applicationName;
    private final JsonFormat.Printer jsonPrinter;
    private ProcessorContext<KOut, VOut> context;

    @Inject
    public TracingDecorator(@Delegate Processor<KIn, VIn, KOut, VOut> delegate, OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter, Tracer tracer, TopologyConfigurationImpl configuration) {
        this(delegate, openTelemetry, textMapGetter, textMapSetter, tracer, configuration.getProcessorPayloadType().getName(), JsonFormat.printer());
    }

    public void init(ProcessorContext<KOut, VOut> context) {
        this.delegate.init(context);
        this.context = context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(Record<KIn, VIn> record) {
        SpanBuilder spanBuilder = this.tracer.spanBuilder(this.applicationName);
        TextMapPropagator propagator = this.openTelemetry.getPropagators().getTextMapPropagator();
        if (propagator.fields().stream().map(arg_0 -> ((Headers)record.headers()).lastHeader(arg_0)).anyMatch(Objects::nonNull)) {
            Context extractedContext = propagator.extract(Context.current(), (Object)record.headers(), (TextMapGetter)this.textMapGetter);
            spanBuilder.setParent(extractedContext);
            propagator.fields().forEach(arg_0 -> ((Headers)record.headers()).remove(arg_0));
        }
        Span span = spanBuilder.startSpan();
        try (Scope scope = span.makeCurrent();){
            try {
                propagator.inject(Context.current(), (Object)record.headers(), (TextMapSetter)this.textMapSetter);
                this.delegate.process(record);
                span.setStatus(StatusCode.OK);
            }
            catch (KafkaException e) {
                span.recordException((Throwable)e);
                span.setStatus(StatusCode.ERROR, e.getMessage());
                this.logInputMessageMetadata(record);
                throw e;
            }
            catch (RuntimeException e) {
                log.error("Runtime error caught while processing the message", (Throwable)e);
                span.recordException((Throwable)e);
                span.setStatus(StatusCode.ERROR, e.getMessage());
                this.logInputMessageMetadata(record);
            }
        }
        finally {
            span.end();
        }
    }

    void logInputMessageMetadata(Record<KIn, VIn> record) {
        if (log.isDebugEnabled()) {
            Map<String, String> headers = TracingDecorator.toMap(record.headers());
            LoggedRecord.LoggedRecordBuilder builder = LoggedRecord.builder().headers(headers).id(headers.get("uuid")).time(ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC)).appId(this.context.applicationId());
            this.marshallValue(record, builder);
            this.extractMetadata(builder);
            log.debug("Input message is {}", (Object)builder.build());
        }
    }

    private static Map<String, String> toMap(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false).collect(Collectors.toMap(Header::key, header -> new String(header.value(), Charset.defaultCharset())));
    }

    private void marshallValue(Record<?, ?> record, LoggedRecord.LoggedRecordBuilder builder) {
        if (record.value() instanceof MessageOrBuilder) {
            try {
                builder.value(this.jsonPrinter.print((MessageOrBuilder)record.value()));
            }
            catch (InvalidProtocolBufferException e) {
                log.error("Could not unmarshal to JSON", (Throwable)e);
            }
        } else {
            builder.value(record.value().toString());
        }
    }

    private void extractMetadata(LoggedRecord.LoggedRecordBuilder builder) {
        this.context.recordMetadata().ifPresent(metadata -> builder.topic(metadata.topic()).partition(metadata.partition()));
    }

    @Generated
    TracingDecorator(Processor<KIn, VIn, KOut, VOut> delegate, OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter, Tracer tracer, String applicationName, JsonFormat.Printer jsonPrinter) {
        this.delegate = delegate;
        this.openTelemetry = openTelemetry;
        this.textMapGetter = textMapGetter;
        this.textMapSetter = textMapSetter;
        this.tracer = tracer;
        this.applicationName = applicationName;
        this.jsonPrinter = jsonPrinter;
    }

    @Generated
    public void close() {
        this.delegate.close();
    }

    private static interface Excludes {
        public <KOut, VOut> void init(ProcessorContext<KOut, VOut> var1);

        public <KIn, VIn> void process(Record<KIn, VIn> var1);
    }
}

