/*
 * Decompiled with CFR 0.152.
 */
package com.blibli.oss.kafka.aop;

import com.blibli.oss.kafka.helper.KafkaHelper;
import com.blibli.oss.kafka.interceptor.InterceptorUtil;
import com.blibli.oss.kafka.interceptor.KafkaConsumerInterceptor;
import com.blibli.oss.kafka.interceptor.events.ConsumerEvent;
import com.blibli.oss.kafka.properties.KafkaProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.annotation.KafkaListener;

public class KafkaListenerInterceptor
implements MethodInterceptor,
ApplicationContextAware,
InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(KafkaListenerInterceptor.class);
    private ApplicationContext applicationContext;
    private ObjectMapper objectMapper;
    private KafkaProperties kafkaProperties;
    private List<KafkaConsumerInterceptor> kafkaConsumerInterceptors;

    public KafkaListenerInterceptor(ObjectMapper objectMapper, KafkaProperties kafkaProperties) {
        this.objectMapper = objectMapper;
        this.kafkaProperties = kafkaProperties;
    }

    public void afterPropertiesSet() throws Exception {
        this.kafkaConsumerInterceptors = InterceptorUtil.getKafkaConsumerInterceptors(this.applicationContext);
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public Object invoke(MethodInvocation invocation) throws Throwable {
        if (this.isKafkaListener(invocation) && this.isConsumerRecordArgument(invocation)) {
            ConsumerRecord<String, String> record = this.getConsumerRecord(invocation);
            ConsumerEvent event = KafkaHelper.toConsumerEvent(record, this.getEventId(record));
            Object bean = invocation.getThis();
            Method method = invocation.getMethod();
            try {
                if (InterceptorUtil.fireBeforeConsume(bean, method, this.kafkaConsumerInterceptors, event)) {
                    return null;
                }
                Object response = invocation.proceed();
                InterceptorUtil.fireAfterSuccessConsume(bean, method, this.kafkaConsumerInterceptors, event);
                return response;
            }
            catch (Throwable throwable) {
                InterceptorUtil.fireAfterErrorConsume(bean, method, this.kafkaConsumerInterceptors, event, throwable);
                throw throwable;
            }
        }
        return invocation.proceed();
    }

    private boolean isKafkaListener(MethodInvocation invocation) {
        Method method = invocation.getMethod();
        return AnnotationUtils.findAnnotation((Method)method, KafkaListener.class) != null;
    }

    private boolean isConsumerRecordArgument(MethodInvocation invocation) {
        Object[] arguments = invocation.getArguments();
        if (arguments == null || arguments.length == 0) {
            return false;
        }
        return Arrays.stream(invocation.getArguments()).anyMatch(o -> o instanceof ConsumerRecord);
    }

    private ConsumerRecord<String, String> getConsumerRecord(MethodInvocation invocation) {
        return Arrays.stream(invocation.getArguments()).filter(o -> o instanceof ConsumerRecord).findFirst().orElse(null);
    }

    private String getEventId(ConsumerRecord<String, String> record) {
        return KafkaHelper.getEventId((String)record.value(), this.objectMapper, this.kafkaProperties);
    }
}

