/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.function;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.PollableBean;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
import org.springframework.cloud.function.context.config.FunctionContextUtils;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindableProxyFactory;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
import org.springframework.cloud.stream.config.BindingBeansRegistrar;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.BindableFunctionProxyFactory;
import org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.Environment;
import org.springframework.core.type.MethodMetadata;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.function.Tuples;

@Configuration
@EnableConfigurationProperties(value={StreamFunctionProperties.class})
@Import(value={BindingBeansRegistrar.class, BinderFactoryAutoConfiguration.class})
@AutoConfigureBefore(value={BindingServiceConfiguration.class})
@AutoConfigureAfter(value={ContextFunctionCatalogAutoConfiguration.class})
@ConditionalOnBean(value={FunctionRegistry.class})
public class FunctionConfiguration {
    private static final String SOURCE_PROPERY = "spring.cloud.stream.source";

    @Bean
    public StreamBridge streamBridgeUtils(FunctionCatalog functionCatalog, FunctionRegistry functionRegistry, BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext, @Nullable BinderAwareChannelResolver.NewDestinationBindingCallback callback) {
        return new StreamBridge(functionCatalog, functionRegistry, bindingServiceProperties, applicationContext, callback);
    }

    @Bean
    public InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
        return new FunctionBindingRegistrar(functionCatalog, streamFunctionProperties);
    }

    @Bean
    public InitializingBean functionInitializer(FunctionCatalog functionCatalog, FunctionInspector functionInspector, StreamFunctionProperties functionProperties, @Nullable BindableProxyFactory[] bindableProxyFactories, BindingServiceProperties serviceProperties, ConfigurableApplicationContext applicationContext, FunctionBindingRegistrar bindingHolder, StreamBridge streamBridge) {
        boolean shouldCreateInitializer = applicationContext.containsBean("output") || ObjectUtils.isEmpty((Object[])applicationContext.getBeanNamesForAnnotation(EnableBinding.class));
        return shouldCreateInitializer ? new FunctionToDestinationBinder(functionCatalog, functionProperties, serviceProperties, streamBridge) : null;
    }

    @Bean
    InitializingBean supplierInitializer(final FunctionCatalog functionCatalog, final StreamFunctionProperties functionProperties, final GenericApplicationContext context, final BindingServiceProperties serviceProperties, final @Nullable BindableFunctionProxyFactory[] proxyFactories, final StreamBridge streamBridge, final TaskScheduler taskScheduler) {
        if (!ObjectUtils.isEmpty((Object[])context.getBeanNamesForAnnotation(EnableBinding.class)) || proxyFactories == null) {
            return null;
        }
        return new InitializingBean(){

            public void afterPropertiesSet() throws Exception {
                for (BindableFunctionProxyFactory proxyFactory : proxyFactories) {
                    IntegrationFlow postProcessedFlow;
                    StandardIntegrationFlow integrationFlow;
                    Type functionType;
                    Object functionWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper)functionCatalog.lookup(proxyFactory.getFunctionDefinition());
                    if (functionWrapper == null || !functionWrapper.isSupplier()) continue;
                    ArrayList<String> contentTypes = new ArrayList<String>();
                    Assert.isTrue((proxyFactory.getOutputs().size() == 1 ? 1 : 0) != 0, (String)"Supplier with multiple outputs is not supported at the moment.");
                    String outputName = proxyFactory.getOutputs().iterator().next();
                    BindingProperties bindingProperties = serviceProperties.getBindingProperties(outputName);
                    ProducerProperties producerProperties = bindingProperties.getProducer();
                    if (bindingProperties.getProducer() == null || !producerProperties.isUseNativeEncoding()) {
                        contentTypes.add(bindingProperties.getContentType());
                    }
                    String functionDefinition = proxyFactory.getFunctionDefinition();
                    Object[] functionNames = StringUtils.delimitedListToStringArray((String)functionDefinition.replaceAll(",", "|").trim(), (String)"|");
                    Function supplier = null;
                    Function function = null;
                    if (!ObjectUtils.isEmpty((Object[])functionNames) && functionNames.length > 1) {
                        Object supplierName = functionNames[0];
                        String remainingFunctionDefinition = StringUtils.arrayToCommaDelimitedString((Object[])Arrays.copyOfRange(functionNames, 1, functionNames.length));
                        supplier = (Function)functionCatalog.lookup((String)supplierName);
                        function = (Function)functionCatalog.lookup(remainingFunctionDefinition, contentTypes.toArray(new String[0]));
                        Type inputType = FunctionTypeUtils.getInputType((Type)((SimpleFunctionRegistry.FunctionInvocationWrapper)function).getFunctionType(), (int)0);
                        Type outputType = FunctionTypeUtils.getOutputType((Type)((SimpleFunctionRegistry.FunctionInvocationWrapper)supplier).getFunctionType(), (int)0);
                        functionWrapper = FunctionTypeUtils.isPublisher((Type)inputType) && !FunctionTypeUtils.isPublisher((Type)outputType) ? null : (SimpleFunctionRegistry.FunctionInvocationWrapper)functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0]));
                    } else {
                        functionWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper)functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0]));
                    }
                    Publisher beginPublishingTrigger = FunctionConfiguration.this.setupBindingTrigger(context);
                    if (functionProperties.isComposeFrom() || functionProperties.isComposeTo()) continue;
                    String integrationFlowName = proxyFactory.getFunctionDefinition() + "_integrationflow";
                    PollableBean pollable = FunctionConfiguration.this.extractPollableAnnotation(functionProperties, context, proxyFactory);
                    if (functionWrapper != null) {
                        functionType = functionWrapper.getFunctionType();
                        integrationFlow = ((IntegrationFlowBuilder)FunctionConfiguration.this.integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper((SimpleFunctionRegistry.FunctionInvocationWrapper)functionWrapper, (ConfigurableApplicationContext)context, producerProperties), (Publisher<Object>)beginPublishingTrigger, pollable, context, taskScheduler, functionType).route(Message.class, message -> {
                            if (message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                                String destinationName = (String)message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                                return streamBridge.resolveDestination(destinationName, producerProperties);
                            }
                            return outputName;
                        })).get();
                        postProcessedFlow = (IntegrationFlow)context.getAutowireCapableBeanFactory().applyBeanPostProcessorsBeforeInitialization((Object)integrationFlow, integrationFlowName);
                        context.registerBean(integrationFlowName, IntegrationFlow.class, () -> postProcessedFlow, new BeanDefinitionCustomizer[0]);
                        continue;
                    }
                    functionType = ((SimpleFunctionRegistry.FunctionInvocationWrapper)supplier).getFunctionType();
                    integrationFlow = ((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)FunctionConfiguration.this.integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper((SimpleFunctionRegistry.FunctionInvocationWrapper)supplier, (ConfigurableApplicationContext)context, producerProperties), (Publisher<Object>)beginPublishingTrigger, pollable, context, taskScheduler, functionType).channel(c -> c.direct())).fluxTransform(function)).route(Message.class, message -> {
                        if (message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                            String destinationName = (String)message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                            return streamBridge.resolveDestination(destinationName, producerProperties);
                        }
                        return outputName;
                    })).get();
                    postProcessedFlow = (IntegrationFlow)context.getAutowireCapableBeanFactory().applyBeanPostProcessorsBeforeInitialization((Object)integrationFlow, integrationFlowName);
                    context.registerBean(integrationFlowName, IntegrationFlow.class, () -> postProcessedFlow, new BeanDefinitionCustomizer[0]);
                }
            }
        };
    }

    private Publisher<Object> setupBindingTrigger(GenericApplicationContext context) {
        AtomicReference triggerRef = new AtomicReference();
        Mono beginPublishingTrigger = Mono.create(emmiter -> triggerRef.set(emmiter));
        context.addApplicationListener(event -> {
            if (event instanceof BindingCreatedEvent && triggerRef.get() != null) {
                ((MonoSink)triggerRef.get()).success();
            }
        });
        return beginPublishingTrigger;
    }

    private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier, Publisher<Object> beginPublishingTrigger, PollableBean pollable, GenericApplicationContext context, TaskScheduler taskScheduler, Type functionType) {
        IntegrationFlowBuilder integrationFlowBuilder;
        boolean splittable;
        boolean bl = splittable = pollable != null && (Boolean)AnnotationUtils.getAnnotationAttributes((Annotation)pollable).get("splittable") != false;
        if (pollable == null && FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getInputType((Type)functionType, (int)0))) {
            Publisher publisher = (Publisher)supplier.get();
            publisher = publisher instanceof Mono ? ((Mono)publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary) : ((Flux)publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary);
            integrationFlowBuilder = IntegrationFlows.from((Publisher)publisher);
            taskScheduler.schedule(() -> {}, Instant.now());
        } else {
            integrationFlowBuilder = IntegrationFlows.from(supplier);
            if (splittable) {
                integrationFlowBuilder = (IntegrationFlowBuilder)integrationFlowBuilder.split();
            }
        }
        return integrationFlowBuilder;
    }

    private PollableBean extractPollableAnnotation(StreamFunctionProperties functionProperties, GenericApplicationContext context, BindableFunctionProxyFactory proxyFactory) {
        Object source;
        String supplierFunctionName = StringUtils.delimitedListToStringArray((String)proxyFactory.getFunctionDefinition().replaceAll(",", "|").trim(), (String)"|")[0];
        BeanDefinition bd = context.getBeanDefinition(supplierFunctionName);
        if (!(bd instanceof RootBeanDefinition)) {
            return null;
        }
        Method factoryMethod = ((RootBeanDefinition)bd).getResolvedFactoryMethod();
        if (factoryMethod == null && (source = bd.getSource()) instanceof MethodMetadata) {
            Class factory = ClassUtils.resolveClassName((String)((MethodMetadata)source).getDeclaringClassName(), null);
            Class[] params = FunctionContextUtils.getParamTypesFromBeanDefinitionFactory((Class)factory, (AbstractBeanDefinition)((RootBeanDefinition)bd));
            factoryMethod = ReflectionUtils.findMethod((Class)factory, (String)((MethodMetadata)source).getMethodName(), (Class[])params);
        }
        Assert.notNull((Object)factoryMethod, (String)("Failed to introspect factory method since it was not discovered for function '" + functionProperties.getDefinition() + "'"));
        return factoryMethod.getReturnType().isAssignableFrom(Supplier.class) ? (PollableBean)AnnotationUtils.findAnnotation((Method)factoryMethod, PollableBean.class) : null;
    }

    private <T> Message<T> wrapToMessageIfNecessary(T value) {
        return value instanceof Message ? (Message)value : MessageBuilder.withPayload(value).build();
    }

    private static class FunctionBindingRegistrar
    implements InitializingBean,
    ApplicationContextAware,
    EnvironmentAware {
        protected final Log logger = LogFactory.getLog(this.getClass());
        private final FunctionCatalog functionCatalog;
        private final StreamFunctionProperties streamFunctionProperties;
        private ConfigurableApplicationContext applicationContext;
        private Environment environment;
        private int inputCount;
        private int outputCount;

        FunctionBindingRegistrar(FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
            this.functionCatalog = functionCatalog;
            this.streamFunctionProperties = streamFunctionProperties;
        }

        public void afterPropertiesSet() throws Exception {
            if (ObjectUtils.isEmpty((Object[])this.applicationContext.getBeanNamesForAnnotation(EnableBinding.class))) {
                RootBeanDefinition functionBindableProxyDefinition;
                this.determineFunctionName(this.functionCatalog, this.environment);
                BeanDefinitionRegistry registry = (BeanDefinitionRegistry)this.applicationContext.getBeanFactory();
                if (StringUtils.hasText((String)this.streamFunctionProperties.getDefinition())) {
                    String[] functionDefinitions;
                    for (String functionDefinition : functionDefinitions = this.streamFunctionProperties.getDefinition().split(";")) {
                        functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);
                        SimpleFunctionRegistry.FunctionInvocationWrapper function = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition);
                        if (function == null) continue;
                        Type functionType = function.getFunctionType();
                        if (function.isSupplier()) {
                            this.inputCount = 0;
                            this.outputCount = this.getOutputCount(functionType, true);
                        } else if (function.isConsumer()) {
                            this.inputCount = FunctionTypeUtils.getInputCount((Type)functionType);
                            this.outputCount = 0;
                        } else {
                            this.inputCount = FunctionTypeUtils.getInputCount((Type)functionType);
                            this.outputCount = this.getOutputCount(functionType, false);
                        }
                        functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)functionDefinition);
                        functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)this.inputCount);
                        functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)this.outputCount);
                        functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)this.streamFunctionProperties);
                        registry.registerBeanDefinition(functionDefinition + "_binding", (BeanDefinition)functionBindableProxyDefinition);
                    }
                }
                if (StringUtils.hasText((String)this.environment.getProperty(FunctionConfiguration.SOURCE_PROPERY))) {
                    String[] sourceNames;
                    for (String sourceName : sourceNames = this.environment.getProperty(FunctionConfiguration.SOURCE_PROPERY).split(";")) {
                        if (this.functionCatalog.lookup(sourceName) != null) continue;
                        functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);
                        functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)sourceName);
                        functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)0);
                        functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)1);
                        functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)this.streamFunctionProperties);
                        registry.registerBeanDefinition(sourceName + "_binding", (BeanDefinition)functionBindableProxyDefinition);
                    }
                }
            } else {
                this.logger.info((Object)"Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration");
            }
        }

        private int getOutputCount(Type functionType, boolean isSupplier) {
            int outputCount = FunctionTypeUtils.getOutputCount((Type)functionType);
            if (!isSupplier && functionType instanceof ParameterizedType) {
                Type outputType = ((ParameterizedType)functionType).getActualTypeArguments()[1];
                if (FunctionTypeUtils.isOfType((Type)outputType, Mono.class) && outputType instanceof ParameterizedType && FunctionTypeUtils.isOfType((Type)((ParameterizedType)outputType).getActualTypeArguments()[0], Void.class)) {
                    outputCount = 0;
                } else if (FunctionTypeUtils.isOfType((Type)outputType, Void.class)) {
                    outputCount = 0;
                }
            }
            return outputCount;
        }

        private void determineFunctionName(FunctionCatalog catalog, Environment environment) {
            String definition = this.streamFunctionProperties.getDefinition();
            if (!StringUtils.hasText((String)definition)) {
                definition = environment.getProperty("spring.cloud.function.definition");
            }
            if (StringUtils.hasText((String)definition)) {
                this.streamFunctionProperties.setDefinition(definition);
            } else if (Boolean.parseBoolean(environment.getProperty("spring.cloud.stream.function.routing.enabled", "false")) || environment.containsProperty("spring.cloud.function.routing-expression")) {
                this.streamFunctionProperties.setDefinition("functionRouter");
            } else {
                this.streamFunctionProperties.setDefinition(((FunctionInspector)this.functionCatalog).getName(this.functionCatalog.lookup("")));
            }
        }

        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (ConfigurableApplicationContext)applicationContext;
        }

        public void setEnvironment(Environment environment) {
            this.environment = environment;
        }
    }

    private static class FunctionWrapper
    implements Function<Message<byte[]>, Object> {
        private final Function function;
        private final ConsumerProperties consumerProperties;
        private final ProducerProperties producerProperties;
        private final Field headersField;
        private final ConfigurableApplicationContext applicationContext;
        private final boolean isRoutingFunction;

        FunctionWrapper(Function function, ConsumerProperties consumerProperties, ProducerProperties producerProperties, ConfigurableApplicationContext applicationContext) {
            this.isRoutingFunction = ((SimpleFunctionRegistry.FunctionInvocationWrapper)function).getTarget() instanceof RoutingFunction;
            this.applicationContext = applicationContext;
            this.function = new PartitionAwareFunctionWrapper((SimpleFunctionRegistry.FunctionInvocationWrapper)function, this.applicationContext, producerProperties);
            this.consumerProperties = consumerProperties;
            if (this.consumerProperties != null) {
                ((SimpleFunctionRegistry.FunctionInvocationWrapper)function).setSkipInputConversion(this.consumerProperties.isUseNativeDecoding());
            }
            this.producerProperties = producerProperties;
            if (this.producerProperties != null) {
                ((SimpleFunctionRegistry.FunctionInvocationWrapper)function).setSkipOutputConversion(this.producerProperties.isUseNativeEncoding());
            }
            this.headersField = ReflectionUtils.findField(MessageHeaders.class, (String)"headers");
            this.headersField.setAccessible(true);
        }

        @Override
        public Object apply(Message<byte[]> message) {
            Object result;
            if (message != null && this.consumerProperties != null) {
                Map headersMap = (Map)ReflectionUtils.getField((Field)this.headersField, (Object)message.getHeaders());
                headersMap.put("skip-type-conversion", this.consumerProperties.isUseNativeDecoding());
            }
            if ((result = this.function.apply(message)) instanceof Publisher && this.isRoutingFunction) {
                throw new IllegalStateException("Routing to functions that return Publisher is not supported in the context of Spring Cloud Stream.");
            }
            return result;
        }
    }

    private static class FunctionToDestinationBinder
    implements InitializingBean,
    ApplicationContextAware {
        protected final Log logger = LogFactory.getLog(this.getClass());
        private GenericApplicationContext applicationContext;
        private BindableProxyFactory[] bindableProxyFactories;
        private final FunctionCatalog functionCatalog;
        private final StreamFunctionProperties functionProperties;
        private final BindingServiceProperties serviceProperties;
        private final StreamBridge streamBridge;

        FunctionToDestinationBinder(FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties, BindingServiceProperties serviceProperties, StreamBridge streamBridge) {
            this.functionCatalog = functionCatalog;
            this.functionProperties = functionProperties;
            this.serviceProperties = serviceProperties;
            this.streamBridge = streamBridge;
        }

        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (GenericApplicationContext)applicationContext;
        }

        public void afterPropertiesSet() throws Exception {
            Map beansOfType = this.applicationContext.getBeansOfType(BindableProxyFactory.class);
            for (BindableProxyFactory bindableProxyFactory : this.bindableProxyFactories = beansOfType.values().toArray(new BindableProxyFactory[0])) {
                SimpleFunctionRegistry.FunctionInvocationWrapper function;
                String functionDefinition = bindableProxyFactory instanceof BindableFunctionProxyFactory ? ((BindableFunctionProxyFactory)bindableProxyFactory).getFunctionDefinition() : this.functionProperties.getDefinition();
                boolean shouldNotProcess = false;
                if (!(bindableProxyFactory instanceof BindableFunctionProxyFactory)) {
                    Set<String> outputBindingNames = bindableProxyFactory.getOutputs();
                    boolean bl = shouldNotProcess = !CollectionUtils.isEmpty(outputBindingNames) && outputBindingNames.iterator().next().equals("applicationMetrics");
                }
                if (!StringUtils.hasText((String)functionDefinition) || shouldNotProcess || (function = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition)) == null || function.isSupplier()) continue;
                this.bindFunctionToDestinations(bindableProxyFactory, functionDefinition);
            }
        }

        private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactory, String functionDefinition) {
            this.assertBindingIsPossible(bindableProxyFactory);
            Set<String> inputBindingNames = bindableProxyFactory.getInputs();
            Set<String> outputBindingNames = bindableProxyFactory.getOutputs();
            String[] outputContentTypes = (String[])outputBindingNames.stream().map(bindingName -> this.serviceProperties.getBindings().get(bindingName).getContentType()).toArray(String[]::new);
            SimpleFunctionRegistry.FunctionInvocationWrapper function = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition, outputContentTypes);
            Type functionType = function.getFunctionType();
            this.assertSupportedSignatures(bindableProxyFactory, functionType);
            if (this.functionProperties.isComposeFrom()) {
                AbstractSubscribableChannel outputChannel = (AbstractSubscribableChannel)this.applicationContext.getBean(outputBindingNames.iterator().next(), AbstractSubscribableChannel.class);
                this.logger.info((Object)("Composing at the head of output destination: " + outputChannel.getBeanName()));
                String outputChannelName = outputChannel.getBeanName();
                DirectWithAttributesChannel newOutputChannel = new DirectWithAttributesChannel();
                newOutputChannel.setAttribute("type", "output");
                newOutputChannel.setComponentName("output.extended");
                this.applicationContext.registerBean("output.extended", MessageChannel.class, () -> newOutputChannel, new BeanDefinitionCustomizer[0]);
                bindableProxyFactory.replaceOutputChannel(outputChannelName, "output.extended", (MessageChannel)newOutputChannel);
                inputBindingNames = Collections.singleton("output");
            }
            if (this.isReactiveOrMultipleInputOutput(bindableProxyFactory, functionType)) {
                Object resultPublishers;
                Object[] inputPublishers = (Publisher[])inputBindingNames.stream().map(inputBindingName -> {
                    ConsumerProperties consumerProperties;
                    BindingProperties bindingProperties = this.serviceProperties.getBindings().get(inputBindingName);
                    ConsumerProperties consumerProperties2 = consumerProperties = bindingProperties == null ? null : bindingProperties.getConsumer();
                    if (consumerProperties != null) {
                        function.setSkipInputConversion(consumerProperties.isUseNativeDecoding());
                        Assert.isTrue((consumerProperties.getConcurrency() <= 1 ? 1 : 0) != 0, (String)("Concurrency > 1 is not supported by reactive consumer, given that project reactor maintains its own concurrency mechanism. Was '..." + inputBindingName + ".consumer.concurrency=" + consumerProperties.getConcurrency() + "'"));
                    }
                    SubscribableChannel inputChannel = (SubscribableChannel)this.applicationContext.getBean(inputBindingName, SubscribableChannel.class);
                    return IntegrationReactiveUtils.messageChannelToFlux((MessageChannel)inputChannel);
                }).toArray(Publisher[]::new);
                Object functionToInvoke = function;
                if (!CollectionUtils.isEmpty(outputBindingNames)) {
                    ProducerProperties producerProperties;
                    BindingProperties bindingProperties = this.serviceProperties.getBindings().get(outputBindingNames.iterator().next());
                    ProducerProperties producerProperties2 = producerProperties = bindingProperties == null ? null : bindingProperties.getProducer();
                    if (producerProperties != null) {
                        function.setSkipOutputConversion(producerProperties.isUseNativeEncoding());
                    }
                    functionToInvoke = new PartitionAwareFunctionWrapper(function, (ConfigurableApplicationContext)this.applicationContext, producerProperties);
                }
                if (!((resultPublishers = functionToInvoke.apply(inputPublishers.length == 1 ? inputPublishers[0] : Tuples.fromArray((Object[])inputPublishers))) instanceof Iterable)) {
                    resultPublishers = Collections.singletonList(resultPublishers);
                }
                Iterator<String> outputBindingIter = outputBindingNames.iterator();
                ((Iterable)resultPublishers).forEach(publisher -> {
                    Flux flux = Flux.from((Publisher)((Publisher)publisher));
                    if (!CollectionUtils.isEmpty((Collection)outputBindingNames)) {
                        MessageChannel outputChannel = (MessageChannel)this.applicationContext.getBean((String)outputBindingIter.next(), MessageChannel.class);
                        flux = flux.doOnNext(message -> {
                            if (message instanceof Message && ((Message)message).getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                                String destinationName = (String)((Message)message).getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                                ProducerProperties producerProperties = this.serviceProperties.getBindings().get(outputBindingNames.iterator().next()).getProducer();
                                SubscribableChannel dynamicChannel = this.streamBridge.resolveDestination(destinationName, producerProperties);
                                if (this.logger.isInfoEnabled()) {
                                    this.logger.info((Object)("Output message is sent to '" + destinationName + "' destination"));
                                }
                                if (!(message instanceof Message)) {
                                    message = MessageBuilder.withPayload((Object)message).build();
                                }
                                dynamicChannel.send((Message)message);
                            } else {
                                if (!(message instanceof Message)) {
                                    message = MessageBuilder.withPayload((Object)message).build();
                                }
                                outputChannel.send((Message)message);
                            }
                        }).doOnError(e -> {
                            this.logger.error((Object)("Failure was detected during execution of the reactive function '" + functionDefinition + "'"));
                            ((Throwable)e).printStackTrace();
                        });
                    }
                    if (!function.isConsumer()) {
                        flux.subscribe();
                    }
                });
            } else {
                String outputDestinationName = this.determineOutputDestinationName(0, bindableProxyFactory, functionType);
                String inputDestinationName = inputBindingNames.iterator().next();
                Object inputDestination = this.applicationContext.getBean(inputDestinationName);
                if (inputDestination != null && inputDestination instanceof SubscribableChannel) {
                    ServiceActivatingHandler handler = this.createFunctionHandler(function, inputDestinationName, outputDestinationName);
                    if (StringUtils.hasText((String)outputDestinationName)) {
                        handler.setOutputChannelName(outputDestinationName);
                    }
                    ((SubscribableChannel)inputDestination).subscribe((MessageHandler)handler);
                }
            }
        }

        private ServiceActivatingHandler createFunctionHandler(SimpleFunctionRegistry.FunctionInvocationWrapper function, String inputChannelName, String outputChannelName) {
            ConsumerProperties consumerProperties = StringUtils.hasText((String)inputChannelName) ? this.serviceProperties.getBindingProperties(inputChannelName).getConsumer() : null;
            final ProducerProperties producerProperties = StringUtils.hasText((String)outputChannelName) ? this.serviceProperties.getBindingProperties(outputChannelName).getProducer() : null;
            ServiceActivatingHandler handler = new ServiceActivatingHandler(new FunctionWrapper((Function)function, consumerProperties, producerProperties, (ConfigurableApplicationContext)this.applicationContext)){

                protected void sendOutputs(Object result, Message<?> requestMessage) {
                    if (result instanceof Iterable) {
                        for (Object resultElement : (Iterable)result) {
                            this.doSendMessage(resultElement, requestMessage);
                        }
                    } else if (ObjectUtils.isArray((Object)result)) {
                        for (int i = 0; i < ((Object[])result).length; ++i) {
                            this.doSendMessage(((Object[])result)[i], requestMessage);
                        }
                    } else {
                        this.doSendMessage(result, requestMessage);
                    }
                }

                private void doSendMessage(Object result, Message<?> requestMessage) {
                    if (result instanceof Message && ((Message)result).getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                        String destinationName = (String)((Message)result).getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                        SubscribableChannel outputChannel = streamBridge.resolveDestination(destinationName, producerProperties);
                        if (this.logger.isInfoEnabled()) {
                            this.logger.info((Object)("Output message is sent to '" + destinationName + "' destination"));
                        }
                        outputChannel.send((Message)result);
                    } else {
                        super.sendOutputs(result, requestMessage);
                    }
                }
            };
            handler.setBeanFactory((BeanFactory)this.applicationContext);
            handler.afterPropertiesSet();
            return handler;
        }

        private boolean isReactiveOrMultipleInputOutput(BindableProxyFactory bindableProxyFactory, Type functionType) {
            boolean reactiveInputsOutputs = FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getInputType((Type)functionType, (int)0)) || FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getOutputType((Type)functionType, (int)0));
            return this.isMultipleInputOutput(bindableProxyFactory) || reactiveInputsOutputs;
        }

        private String determineOutputDestinationName(int index, BindableProxyFactory bindableProxyFactory, Type functionType) {
            List<String> outputNames = new ArrayList<String>(bindableProxyFactory.getOutputs());
            if (CollectionUtils.isEmpty(outputNames)) {
                outputNames = Collections.singletonList("output");
            }
            String outputDestinationName = bindableProxyFactory instanceof BindableFunctionProxyFactory ? ((BindableFunctionProxyFactory)bindableProxyFactory).getOutputName(index) : (FunctionTypeUtils.isConsumer((Type)functionType) ? null : outputNames.get(index));
            return outputDestinationName;
        }

        private void assertBindingIsPossible(BindableProxyFactory bindableProxyFactory) {
            if (this.isMultipleInputOutput(bindableProxyFactory)) {
                Assert.isTrue((!this.functionProperties.isComposeTo() && !this.functionProperties.isComposeFrom() ? 1 : 0) != 0, (String)"Composing to/from existing Sinks and Sources are not supported for functions with multiple arguments.");
            }
        }

        private boolean isMultipleInputOutput(BindableProxyFactory bindableProxyFactory) {
            return bindableProxyFactory instanceof BindableFunctionProxyFactory && ((BindableFunctionProxyFactory)bindableProxyFactory).isMultiple();
        }

        private void assertSupportedSignatures(BindableProxyFactory bindableProxyFactory, Type functionType) {
            if (this.isMultipleInputOutput(bindableProxyFactory)) {
                Assert.isTrue((!FunctionTypeUtils.isConsumer((Type)functionType) ? 1 : 0) != 0, (String)("Function '" + this.functionProperties.getDefinition() + "' is a Consumer which is not supported for multi-in/out reactive streams. Only Functions are supported"));
                Assert.isTrue((!FunctionTypeUtils.isSupplier((Type)functionType) ? 1 : 0) != 0, (String)("Function '" + this.functionProperties.getDefinition() + "' is a Supplier which is not supported for multi-in/out reactive streams. Only Functions are supported"));
                Assert.isTrue((!FunctionTypeUtils.isInputArray((Type)functionType) && !FunctionTypeUtils.isOutputArray((Type)functionType) ? 1 : 0) != 0, (String)("Function '" + this.functionProperties.getDefinition() + "' has the following signature: [" + functionType + "]. Your input and/or outout lacks arity and therefore we can not determine how many input/output destinations are required in the context of function input/output binding."));
                int inputCount = FunctionTypeUtils.getInputCount((Type)functionType);
                for (int i = 0; i < inputCount; ++i) {
                    Assert.isTrue((boolean)FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getInputType((Type)functionType, (int)i)), (String)("Function '" + this.functionProperties.getDefinition() + "' has the following signature: [" + functionType + "]. Non-reactive functions with multiple inputs/outputs are not supported in the context of Spring Cloud Stream."));
                }
                int outputCount = FunctionTypeUtils.getOutputCount((Type)functionType);
                for (int i = 0; i < outputCount; ++i) {
                    Assert.isTrue((boolean)FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getOutputType((Type)functionType, (int)i)), (String)("Function '" + this.functionProperties.getDefinition() + "' has the following signature: [" + functionType + "]. Non-reactive functions with multiple inputs/outputs are not supported in the context of Spring Cloud Stream."));
                }
            }
        }
    }
}

