/*
 * Decompiled with CFR 0.152.
 */
package org.flowable.eventregistry.spring.kafka;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.flowable.eventregistry.api.ChannelModelProcessor;
import org.flowable.eventregistry.api.EventRegistry;
import org.flowable.eventregistry.api.EventRepositoryService;
import org.flowable.eventregistry.model.ChannelModel;
import org.flowable.eventregistry.model.InboundChannelModel;
import org.flowable.eventregistry.model.KafkaInboundChannelModel;
import org.flowable.eventregistry.model.KafkaOutboundChannelModel;
import org.flowable.eventregistry.spring.kafka.KafkaChannelMessageListenerAdapter;
import org.flowable.eventregistry.spring.kafka.KafkaOperationsOutboundEventChannelAdapter;
import org.flowable.eventregistry.spring.kafka.SimpleKafkaListenerEndpoint;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.EmbeddedValueResolver;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;

public class KafkaChannelDefinitionProcessor
implements BeanFactoryAware,
ChannelModelProcessor {
    public static final String CHANNEL_ID_PREFIX = "org.flowable.eventregistry.kafka.ChannelKafkaListenerEndpointContainer#";
    protected KafkaOperations<Object, Object> kafkaOperations;
    protected KafkaListenerEndpointRegistry endpointRegistry;
    protected String containerFactoryBeanName = "kafkaListenerContainerFactory";
    protected KafkaListenerContainerFactory<?> containerFactory;
    protected BeanFactory beanFactory;
    protected BeanExpressionResolver resolver = new StandardBeanExpressionResolver();
    protected StringValueResolver embeddedValueResolver;
    protected BeanExpressionContext expressionContext;

    public boolean canProcess(ChannelModel channelModel) {
        return channelModel instanceof KafkaInboundChannelModel || channelModel instanceof KafkaOutboundChannelModel;
    }

    public void registerChannelModel(ChannelModel channelModel, String tenantId, EventRegistry eventRegistry, EventRepositoryService eventRepositoryService, boolean fallbackToDefaultTenant) {
        if (channelModel instanceof KafkaInboundChannelModel) {
            KafkaInboundChannelModel kafkaChannelModel = (KafkaInboundChannelModel)channelModel;
            KafkaListenerEndpoint endpoint = this.createKafkaListenerEndpoint(kafkaChannelModel, tenantId, eventRegistry);
            this.registerEndpoint(endpoint, null);
        } else if (channelModel instanceof KafkaOutboundChannelModel) {
            this.processOutboundDefinition((KafkaOutboundChannelModel)channelModel);
        }
    }

    protected KafkaListenerEndpoint createKafkaListenerEndpoint(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) {
        String endpointId = this.getEndpointId((ChannelModel)channelModel, tenantId);
        SimpleKafkaListenerEndpoint<Object, Object> endpoint = new SimpleKafkaListenerEndpoint<Object, Object>();
        endpoint.setId(endpointId);
        endpoint.setGroupId(this.getEndpointGroupId(channelModel, endpoint.getId()));
        endpoint.setTopics(this.resolveTopics(channelModel));
        endpoint.setTopicPattern(this.resolvePattern(channelModel));
        endpoint.setClientIdPrefix(this.resolveExpressionAsString(channelModel.getClientIdPrefix(), "clientIdPrefix"));
        endpoint.setConcurrency(this.resolveExpressionAsInteger(channelModel.getConcurrency(), "concurrency"));
        endpoint.setConsumerProperties(this.resolveProperties(channelModel.getCustomProperties()));
        endpoint.setMessageListener(this.createMessageListener(eventRegistry, (InboundChannelModel)channelModel));
        return endpoint;
    }

    protected void processOutboundDefinition(KafkaOutboundChannelModel channelModel) {
        String topic = channelModel.getTopic();
        if (channelModel.getOutboundEventChannelAdapter() == null && StringUtils.hasText((String)topic)) {
            channelModel.setOutboundEventChannelAdapter((Object)new KafkaOperationsOutboundEventChannelAdapter(this.kafkaOperations, topic, channelModel.getRecordKey()));
        }
    }

    protected Integer resolveExpressionAsInteger(String value, String attribute) {
        Object resolved = this.resolveExpression(value);
        Integer result = null;
        if (resolved instanceof String) {
            result = Integer.parseInt((String)resolved);
        } else if (resolved instanceof Number) {
            result = ((Number)resolved).intValue();
        } else if (resolved != null) {
            throw new IllegalStateException("The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
        return result;
    }

    protected String resolveExpressionAsString(String value, String attribute) {
        if (!StringUtils.hasLength((String)value)) {
            return null;
        }
        Object resolved = this.resolveExpression(value);
        if (resolved instanceof String) {
            return (String)resolved;
        }
        throw new IllegalStateException("The [" + attribute + "] must resolve to a String. Resolved to [" + resolved.getClass() + "] for [" + value + "]");
    }

    protected Collection<String> resolveTopics(KafkaInboundChannelModel channelDefinition) {
        Collection topics = channelDefinition.getTopics();
        ArrayList<String> resultTopics = new ArrayList<String>();
        for (String queue : topics) {
            this.resolveTopics(this.resolveExpression(queue), resultTopics, channelDefinition);
        }
        return resultTopics;
    }

    protected void resolveTopics(Object resolvedValue, List<String> result, KafkaInboundChannelModel channelDefinition) {
        if (resolvedValue instanceof String[]) {
            for (String object : (String[])resolvedValue) {
                this.resolveTopics(object, result, channelDefinition);
            }
        } else if (resolvedValue instanceof String) {
            result.add((String)resolvedValue);
        } else if (resolvedValue instanceof Iterable) {
            for (Object object : (Iterable)resolvedValue) {
                this.resolveTopics(object, result, channelDefinition);
            }
        } else {
            throw new IllegalArgumentException("Channel definition " + channelDefinition + " cannot resolve " + resolvedValue + " as a String[] or a String");
        }
    }

    protected Pattern resolvePattern(KafkaInboundChannelModel channelModel) {
        Pattern pattern = null;
        String topicPattern = channelModel.getTopicPattern();
        if (StringUtils.hasText((String)topicPattern)) {
            Object resolved = this.resolveExpression(topicPattern);
            if (resolved instanceof String) {
                pattern = Pattern.compile((String)resolved);
            } else if (resolved instanceof Pattern) {
                pattern = (Pattern)resolved;
            } else if (resolved != null) {
                throw new IllegalStateException("topicPattern in channel model [ " + channelModel + " ] must resolve to a Pattern or String, not " + resolved.getClass());
            }
        }
        return pattern;
    }

    protected Object resolveExpression(String value) {
        String resolvedValue = this.resolve(value);
        return this.resolver.evaluate(resolvedValue, this.expressionContext);
    }

    protected GenericMessageListener<ConsumerRecord<Object, Object>> createMessageListener(EventRegistry eventRegistry, InboundChannelModel inboundChannelModel) {
        KafkaChannelMessageListenerAdapter kafkaChannelMessageListenerAdapter = new KafkaChannelMessageListenerAdapter(eventRegistry, inboundChannelModel);
        return kafkaChannelMessageListenerAdapter;
    }

    public void unregisterChannelModel(ChannelModel channelModel, String tenantId, EventRepositoryService eventRepositoryService) {
        Field listenerContainersField;
        String endpointId = this.getEndpointId(channelModel, tenantId);
        MessageListenerContainer listenerContainer = this.endpointRegistry.getListenerContainer(endpointId);
        if (listenerContainer != null) {
            listenerContainer.stop();
        }
        if (listenerContainer instanceof DisposableBean) {
            try {
                ((DisposableBean)listenerContainer).destroy();
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to destroy listener container", e);
            }
        }
        if ((listenerContainersField = ReflectionUtils.findField(this.endpointRegistry.getClass(), (String)"listenerContainers")) != null) {
            listenerContainersField.setAccessible(true);
            Map listenerContainers = (Map)ReflectionUtils.getField((Field)listenerContainersField, (Object)this.endpointRegistry);
            if (listenerContainers != null) {
                listenerContainers.remove(endpointId);
            }
        } else {
            throw new IllegalStateException("Endpoint registry " + this.endpointRegistry + " does not have listenerContainers field");
        }
    }

    protected void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
        Assert.notNull((Object)endpoint, (String)"Endpoint must not be null");
        Assert.hasText((String)endpoint.getId(), (String)"Endpoint id must be set");
        Assert.state((this.endpointRegistry != null ? 1 : 0) != 0, (String)"No KafkaListenerEndpointRegistry set");
        this.endpointRegistry.registerListenerContainer(endpoint, this.resolveContainerFactory(endpoint, factory), true);
    }

    protected KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> containerFactory) {
        if (containerFactory != null) {
            return containerFactory;
        }
        if (this.containerFactory != null) {
            return this.containerFactory;
        }
        if (this.containerFactoryBeanName != null) {
            Assert.state((this.beanFactory != null ? 1 : 0) != 0, (String)"BeanFactory must be set to obtain container factory by bean name");
            this.containerFactory = (KafkaListenerContainerFactory)this.beanFactory.getBean(this.containerFactoryBeanName, KafkaListenerContainerFactory.class);
            return this.containerFactory;
        }
        throw new IllegalStateException("Could not resolve the " + KafkaListenerContainerFactory.class.getSimpleName() + " to use for [" + endpoint + "] no factory was given and no default is set.");
    }

    protected String getEndpointId(ChannelModel channelModel, String tenantId) {
        String channelDefinitionKey = channelModel.getKey();
        if (!StringUtils.hasText((String)tenantId)) {
            return CHANNEL_ID_PREFIX + channelDefinitionKey;
        }
        return CHANNEL_ID_PREFIX + tenantId + "#" + channelDefinitionKey;
    }

    protected String getEndpointGroupId(KafkaInboundChannelModel channelDefinition, String id) {
        String groupId = this.resolveExpressionAsString(channelDefinition.getGroupId(), "groupId");
        if (groupId == null) {
            groupId = id;
        }
        return groupId;
    }

    protected String resolve(String value) {
        if (value == null) {
            return null;
        }
        if (this.embeddedValueResolver != null) {
            return this.embeddedValueResolver.resolveStringValue(value);
        }
        return value;
    }

    protected Properties resolveProperties(List<KafkaInboundChannelModel.CustomProperty> consumerProperties) {
        if (consumerProperties != null && !consumerProperties.isEmpty()) {
            Properties properties = new Properties();
            for (KafkaInboundChannelModel.CustomProperty consumerProperty : consumerProperties) {
                properties.put(consumerProperty.getName(), this.resolveExpressionAsString(consumerProperty.getValue(), consumerProperty.getName()));
            }
            return properties;
        }
        return null;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
        if (beanFactory instanceof ConfigurableListableBeanFactory) {
            this.embeddedValueResolver = new EmbeddedValueResolver((ConfigurableBeanFactory)beanFactory);
            this.resolver = ((ConfigurableListableBeanFactory)beanFactory).getBeanExpressionResolver();
            this.expressionContext = new BeanExpressionContext((ConfigurableBeanFactory)((ConfigurableListableBeanFactory)beanFactory), null);
        }
    }

    public KafkaOperations<Object, Object> getKafkaOperations() {
        return this.kafkaOperations;
    }

    public void setKafkaOperations(KafkaOperations<Object, Object> kafkaOperations) {
        this.kafkaOperations = kafkaOperations;
    }

    public KafkaListenerEndpointRegistry getEndpointRegistry() {
        return this.endpointRegistry;
    }

    public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {
        this.endpointRegistry = endpointRegistry;
    }

    public String getContainerFactoryBeanName() {
        return this.containerFactoryBeanName;
    }

    public void setContainerFactoryBeanName(String containerFactoryBeanName) {
        this.containerFactoryBeanName = containerFactoryBeanName;
    }

    public KafkaListenerContainerFactory<?> getContainerFactory() {
        return this.containerFactory;
    }

    public void setContainerFactory(KafkaListenerContainerFactory<?> containerFactory) {
        this.containerFactory = containerFactory;
    }
}

