/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus.implementation.instrumentation;

import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.util.Context;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.instrumentation.ContextAccessor;
import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusMeter;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.Map;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Mono;

public final class ServiceBusReceiverInstrumentation {
    private static final Symbol ENQUEUED_TIME_SYMBOL = Symbol.getSymbol((String)AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue());
    private final ServiceBusMeter meter;
    private final ServiceBusTracer tracer;
    private final ReceiverKind receiverKind;

    public ServiceBusReceiverInstrumentation(Tracer tracer, Meter meter, String fullyQualifiedName, String entityPath, String subscriptionName, ReceiverKind receiverKind) {
        this.tracer = new ServiceBusTracer(tracer, fullyQualifiedName, entityPath);
        this.meter = new ServiceBusMeter(meter, fullyQualifiedName, entityPath, subscriptionName);
        this.receiverKind = receiverKind;
    }

    public AutoCloseable startTrackingSettlementSequenceNumber() {
        return this.meter.isSettlementEnabled() ? this.meter.trackSettlementSequenceNumber() : null;
    }

    public boolean isEnabled() {
        return this.tracer.isEnabled() || this.meter.isSettlementEnabled() || this.meter.isConsumerLagEnabled();
    }

    public boolean isProcessorInstrumentation() {
        return this.receiverKind == ReceiverKind.PROCESSOR;
    }

    public boolean isAsyncReceiverInstrumentation() {
        return this.receiverKind == ReceiverKind.ASYNC_RECEIVER;
    }

    public Context startProcessInstrumentation(String name, Map<String, Object> applicationProperties, OffsetDateTime enqueuedTime, Context parent) {
        if (applicationProperties == null || !this.tracer.isEnabled() && !this.meter.isConsumerLagEnabled()) {
            return parent;
        }
        Context span = this.tracer.isEnabled() && this.receiverKind != ReceiverKind.SYNC_RECEIVER ? this.tracer.startProcessSpan(name, applicationProperties, enqueuedTime, parent) : parent;
        this.meter.reportConsumerLag(enqueuedTime, span);
        return span;
    }

    public void instrumentProcess(ServiceBusReceivedMessage message, ReceiverKind caller, Function<ServiceBusReceivedMessage, Throwable> handleMessage) {
        if (this.receiverKind != caller || message == null) {
            handleMessage.apply(message);
            return;
        }
        Context span = this.startProcessInstrumentation("ServiceBus.process", message.getApplicationProperties(), message.getEnqueuedTime(), Context.NONE);
        ContextAccessor.setContext(message, span);
        this.wrap(span, message, handleMessage);
    }

    public void instrumentProcess(Message message, ReceiverKind caller, Function<Message, Throwable> handleMessage) {
        if (this.receiverKind != caller || message == null || message.getApplicationProperties() == null) {
            handleMessage.apply(message);
            return;
        }
        Context span = this.startProcessInstrumentation("ServiceBus.process", message.getApplicationProperties().getValue(), this.getEnqueuedTime(message), Context.NONE);
        this.wrap(span, message, handleMessage);
    }

    public <T> Mono<T> instrumentSettlement(Mono<T> publisher, ServiceBusReceivedMessage message, Context messageContext, DispositionStatus status) {
        if (this.tracer.isEnabled() || this.meter.isSettlementEnabled()) {
            return Mono.defer(() -> {
                long startTime = Instant.now().toEpochMilli();
                Context span = this.tracer.startSpanWithLink(ServiceBusReceiverInstrumentation.getSettlementSpanName(status), ServiceBusTracer.OperationName.SETTLE, message, messageContext);
                return publisher.doOnEach(signal -> {
                    this.meter.reportSettlement(startTime, message.getSequenceNumber(), status, signal.getThrowable(), false, span);
                    this.tracer.endSpan(signal.getThrowable(), span, null);
                }).doOnCancel(() -> {
                    this.meter.reportSettlement(startTime, message.getSequenceNumber(), status, null, true, span);
                    this.tracer.cancelSpan(span);
                });
            });
        }
        return publisher;
    }

    public ServiceBusTracer getTracer() {
        return this.tracer;
    }

    private static String getSettlementSpanName(DispositionStatus status) {
        switch (status) {
            case COMPLETED: {
                return "ServiceBus.complete";
            }
            case ABANDONED: {
                return "ServiceBus.abandon";
            }
            case DEFERRED: {
                return "ServiceBus.defer";
            }
            case SUSPENDED: {
                return "ServiceBus.deadLetter";
            }
            case RELEASED: {
                return "ServiceBus.release";
            }
        }
        return "ServiceBus.unknown";
    }

    private <T> void wrap(Context span, T message, Function<T, Throwable> handleMessage) {
        AutoCloseable scope = this.tracer.makeSpanCurrent(span);
        Throwable error = null;
        try {
            error = handleMessage.apply(message);
        }
        catch (Throwable t) {
            error = t;
            throw t;
        }
        finally {
            this.tracer.endSpan(error, span, scope);
        }
    }

    private OffsetDateTime getEnqueuedTime(Message message) {
        if (message.getMessageAnnotations() == null || message.getMessageAnnotations().getValue() == null) {
            return null;
        }
        Object date = message.getMessageAnnotations().getValue().get(ENQUEUED_TIME_SYMBOL);
        if (date instanceof Date) {
            return ((Date)date).toInstant().atOffset(ZoneOffset.UTC);
        }
        return null;
    }
}

