/*
 * Decompiled with CFR 0.152.
 */
package com.blibli.oss.kafka.aspect;

import com.blibli.oss.kafka.helper.KafkaHelper;
import com.blibli.oss.kafka.interceptor.InterceptorUtil;
import com.blibli.oss.kafka.interceptor.KafkaConsumerInterceptor;
import com.blibli.oss.kafka.interceptor.events.ConsumerEvent;
import com.blibli.oss.kafka.properties.KafkaProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

@Aspect
public class KafkaListenerAspect
implements ApplicationContextAware,
InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(KafkaListenerAspect.class);
    private ApplicationContext applicationContext;
    private ObjectMapper objectMapper;
    private KafkaProperties kafkaProperties;
    private List<KafkaConsumerInterceptor> kafkaConsumerInterceptors;

    public KafkaListenerAspect(ObjectMapper objectMapper, KafkaProperties kafkaProperties) {
        this.objectMapper = objectMapper;
        this.kafkaProperties = kafkaProperties;
    }

    public void afterPropertiesSet() {
        this.kafkaConsumerInterceptors = InterceptorUtil.getKafkaConsumerInterceptors(this.applicationContext);
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Around(value="@annotation(org.springframework.kafka.annotation.KafkaListener)")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        if (this.isConsumerRecordArgument(joinPoint) && joinPoint.getSignature() instanceof MethodSignature) {
            ConsumerRecord<String, String> record = this.getConsumerRecord(joinPoint);
            ConsumerEvent event = KafkaHelper.toConsumerEvent(record, this.getEventId(record));
            Object bean = joinPoint.getTarget();
            Method method = ((MethodSignature)joinPoint.getSignature()).getMethod();
            try {
                if (InterceptorUtil.fireBeforeConsume(bean, method, this.kafkaConsumerInterceptors, event)) {
                    return null;
                }
                Object response = joinPoint.proceed(joinPoint.getArgs());
                InterceptorUtil.fireAfterSuccessConsume(bean, method, this.kafkaConsumerInterceptors, event);
                return response;
            }
            catch (Throwable throwable) {
                InterceptorUtil.fireAfterErrorConsume(bean, method, this.kafkaConsumerInterceptors, event, throwable);
                throw throwable;
            }
        }
        return joinPoint.proceed(joinPoint.getArgs());
    }

    private boolean isConsumerRecordArgument(ProceedingJoinPoint joinPoint) {
        Object[] arguments = joinPoint.getArgs();
        if (arguments == null || arguments.length == 0) {
            return false;
        }
        return Arrays.stream(joinPoint.getArgs()).anyMatch(o -> o instanceof ConsumerRecord);
    }

    private ConsumerRecord<String, String> getConsumerRecord(ProceedingJoinPoint joinPoint) {
        return Arrays.stream(joinPoint.getArgs()).filter(o -> o instanceof ConsumerRecord).findFirst().orElse(null);
    }

    private String getEventId(ConsumerRecord<String, String> record) {
        return KafkaHelper.getEventId((String)record.value(), this.objectMapper, this.kafkaProperties);
    }
}

