/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.retrytopic;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.DestinationTopicProcessor;
import org.springframework.kafka.retrytopic.EndpointHandlerMethod;
import org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer;
import org.springframework.kafka.retrytopic.ListenerContainerFactoryResolver;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
import org.springframework.kafka.retrytopic.SuffixingRetryTopicNamesProviderFactory;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;

public class RetryTopicConfigurer {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryTopicConfigurer.class));
    public static final EndpointHandlerMethod DEFAULT_DLT_HANDLER = RetryTopicConfigurer.createHandlerMethodWith(LoggingDltListenerHandlerMethod.class, "logMessage");
    private final DestinationTopicProcessor destinationTopicProcessor;
    private final ListenerContainerFactoryResolver containerFactoryResolver;
    private final ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer;
    private final BeanFactory beanFactory;
    private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;

    @Deprecated
    public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, BeanFactory beanFactory) {
        this(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, new SuffixingRetryTopicNamesProviderFactory());
    }

    @Autowired
    public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, BeanFactory beanFactory, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
        this.destinationTopicProcessor = destinationTopicProcessor;
        this.containerFactoryResolver = containerFactoryResolver;
        this.listenerContainerFactoryConfigurer = listenerContainerFactoryConfigurer;
        this.beanFactory = beanFactory;
        this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory;
    }

    public void processMainAndRetryListeners(EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, KafkaListenerEndpointRegistrar registrar, @Nullable KafkaListenerContainerFactory<?> factory, String defaultContainerFactoryBeanName) {
        this.throwIfMultiMethodEndpoint(mainEndpoint);
        DestinationTopicProcessor.Context context = new DestinationTopicProcessor.Context(configuration.getDestinationTopicProperties());
        this.configureEndpoints(mainEndpoint, endpointProcessor, factory, registrar, configuration, context, defaultContainerFactoryBeanName);
        this.destinationTopicProcessor.processRegisteredDestinations(this.getTopicCreationFunction(configuration), context);
    }

    private void configureEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, EndpointProcessor endpointProcessor, KafkaListenerContainerFactory<?> factory, KafkaListenerEndpointRegistrar registrar, RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context, String defaultContainerFactoryBeanName) {
        this.destinationTopicProcessor.processDestinationTopicProperties(destinationTopicProperties -> this.processAndRegisterEndpoints(mainEndpoint, endpointProcessor, factory, defaultContainerFactoryBeanName, registrar, configuration, context, (DestinationTopic.Properties)destinationTopicProperties), context);
    }

    private void processAndRegisterEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, EndpointProcessor endpointProcessor, KafkaListenerContainerFactory<?> factory, String defaultFactoryBeanName, KafkaListenerEndpointRegistrar registrar, RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context, DestinationTopic.Properties destinationTopicProperties) {
        ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = destinationTopicProperties.isMainEndpoint() ? this.resolveAndConfigureFactoryForMainEndpoint(factory, defaultFactoryBeanName, configuration) : this.resolveAndConfigureFactoryForRetryEndpoint(factory, defaultFactoryBeanName, configuration);
        MethodKafkaListenerEndpoint endpoint = destinationTopicProperties.isMainEndpoint() ? mainEndpoint : new MethodKafkaListenerEndpoint();
        endpointProcessor.accept(endpoint);
        EndpointHandlerMethod endpointBeanMethod = this.getEndpointHandlerMethod(mainEndpoint, configuration, destinationTopicProperties);
        this.createEndpointCustomizer(endpointBeanMethod, destinationTopicProperties).customizeEndpointAndCollectTopics(endpoint).forEach(topicNamesHolder -> this.destinationTopicProcessor.registerDestinationTopic(topicNamesHolder.getMainTopic(), topicNamesHolder.getProcessedTopic(), destinationTopicProperties, context));
        registrar.registerEndpoint(endpoint, resolvedFactory);
        endpoint.setBeanFactory(this.beanFactory);
    }

    private EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, DestinationTopic.Properties props) {
        EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod();
        EndpointHandlerMethod retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
        return props.isDltTopic() ? this.getDltEndpointHandlerMethodOrDefault(dltHandlerMethod) : retryBeanMethod;
    }

    private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfiguration config) {
        RetryTopicConfiguration.TopicCreation topicCreationConfig = config.forKafkaTopicAutoCreation();
        return topicCreationConfig.shouldCreateTopics() ? topics -> this.createNewTopicBeans((Collection<String>)topics, topicCreationConfig) : topics -> {};
    }

    private void createNewTopicBeans(Collection<String> topics, RetryTopicConfiguration.TopicCreation config) {
        topics.forEach(topic -> ((DefaultListableBeanFactory)this.beanFactory).registerSingleton(topic + "-topicRegistrationBean", (Object)new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor())));
    }

    private EndpointCustomizer createEndpointCustomizer(EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) {
        return new EndpointCustomizerFactory(destinationTopicProperties, endpointBeanMethod, this.beanFactory, this.retryTopicNamesProviderFactory).createEndpointCustomizer();
    }

    private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandlerMethod dltEndpointHandlerMethod) {
        return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
    }

    private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForMainEndpoint(KafkaListenerContainerFactory<?> providedFactory, String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
        ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = this.containerFactoryResolver.resolveFactoryForMainEndpoint(providedFactory, defaultFactoryBeanName, configuration.forContainerFactoryResolver());
        return this.listenerContainerFactoryConfigurer.configureWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer());
    }

    private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForRetryEndpoint(KafkaListenerContainerFactory<?> providedFactory, String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
        ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = this.containerFactoryResolver.resolveFactoryForRetryEndpoint(providedFactory, defaultFactoryBeanName, configuration.forContainerFactoryResolver());
        return this.listenerContainerFactoryConfigurer.configure(resolvedFactory, configuration.forContainerFactoryConfigurer());
    }

    private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
        if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
            throw new IllegalArgumentException("Retry Topic is not compatible with " + MultiMethodKafkaListenerEndpoint.class);
        }
    }

    public static EndpointHandlerMethod createHandlerMethodWith(Class<?> beanClass, String methodName) {
        return new EndpointHandlerMethod(beanClass, methodName);
    }

    public static EndpointHandlerMethod createHandlerMethodWith(Object bean, Method method) {
        return new EndpointHandlerMethod(bean, method);
    }

    static class LoggingDltListenerHandlerMethod {
        public static final String DEFAULT_DLT_METHOD_NAME = "logMessage";

        LoggingDltListenerHandlerMethod() {
        }

        public void logMessage(Object message) {
            if (message instanceof ConsumerRecord) {
                LOGGER.info(() -> "Received message in dlt listener: " + ListenerUtils.recordToString((ConsumerRecord)message));
            } else {
                LOGGER.info(() -> "Received message in dlt listener.");
            }
        }
    }

    private static final class TopicNamesHolder {
        private final String mainTopic;
        private final String processedTopic;

        TopicNamesHolder(String mainTopic, String processedTopic) {
            this.mainTopic = mainTopic;
            this.processedTopic = processedTopic;
        }

        String getMainTopic() {
            return this.mainTopic;
        }

        String getProcessedTopic() {
            return this.processedTopic;
        }
    }

    static final class EndpointCustomizerFactory {
        private final DestinationTopic.Properties destinationProperties;
        private final EndpointHandlerMethod beanMethod;
        private final BeanFactory beanFactory;
        private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;

        EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod, BeanFactory beanFactory, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
            this.destinationProperties = destinationProperties;
            this.beanMethod = beanMethod;
            this.beanFactory = beanFactory;
            this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory;
        }

        public EndpointCustomizer createEndpointCustomizer() {
            return this.addSuffixesAndMethod(this.destinationProperties, this.beanMethod.resolveBean(this.beanFactory), this.beanMethod.getMethod());
        }

        private EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean, Method method) {
            RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider = this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties);
            return endpoint -> {
                Collection<TopicNamesHolder> topics = this.customizeAndRegisterTopics(namesProvider, (MethodKafkaListenerEndpoint<?, ?>)endpoint);
                endpoint.setId(namesProvider.getEndpointId((MethodKafkaListenerEndpoint<?, ?>)endpoint));
                endpoint.setGroupId(namesProvider.getGroupId((MethodKafkaListenerEndpoint<?, ?>)endpoint));
                endpoint.setTopics((String[])topics.stream().map(TopicNamesHolder::getProcessedTopic).toArray(String[]::new));
                endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix((MethodKafkaListenerEndpoint<?, ?>)endpoint));
                endpoint.setGroup(namesProvider.getGroup((MethodKafkaListenerEndpoint<?, ?>)endpoint));
                endpoint.setBean(bean);
                endpoint.setMethod(method);
                return topics;
            };
        }

        private Collection<TopicNamesHolder> customizeAndRegisterTopics(RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, MethodKafkaListenerEndpoint<?, ?> endpoint) {
            return this.getTopics(endpoint).stream().map(topic -> new TopicNamesHolder((String)topic, namesProvider.getTopicName((String)topic))).collect(Collectors.toList());
        }

        private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint) {
            TopicPartitionOffset[] topicPartitionsToAssign;
            Collection topics = endpoint.getTopics();
            if (topics.isEmpty() && (topicPartitionsToAssign = endpoint.getTopicPartitionsToAssign()) != null && topicPartitionsToAssign.length > 0) {
                topics = Arrays.stream(topicPartitionsToAssign).map(TopicPartitionOffset::getTopic).collect(Collectors.toList());
            }
            if (topics.isEmpty()) {
                throw new IllegalStateException("No topics where provided for RetryTopicConfiguration.");
            }
            return topics;
        }
    }

    private static interface EndpointCustomizer
    extends Function<MethodKafkaListenerEndpoint<?, ?>, Collection<TopicNamesHolder>> {
        default public Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint) {
            return (Collection)this.apply(listenerEndpoint);
        }
    }

    public static interface EndpointProcessor
    extends Consumer<MethodKafkaListenerEndpoint<?, ?>> {
        default public void process(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint) {
            this.accept(listenerEndpoint);
        }
    }
}

