/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.servicebus.core.processor;

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.spring.core.AzureSpringIdentifier;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.service.servicebus.factory.ServiceBusProcessorClientBuilderFactory;
import com.azure.spring.service.servicebus.factory.ServiceBusSessionProcessorClientBuilderFactory;
import com.azure.spring.service.servicebus.processor.MessageProcessingListener;
import com.azure.spring.service.servicebus.properties.ServiceBusEntityType;
import com.azure.spring.service.servicebus.properties.ServiceBusProcessorDescriptor;
import com.azure.spring.servicebus.core.processor.ServiceBusProcessorFactory;
import com.azure.spring.servicebus.core.properties.NamespaceProperties;
import com.azure.spring.servicebus.core.properties.ProcessorProperties;
import com.azure.spring.servicebus.core.properties.merger.ProcessorPropertiesParentMerger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.lang.Nullable;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class DefaultServiceBusNamespaceProcessorFactory
implements ServiceBusProcessorFactory,
DisposableBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceBusNamespaceProcessorFactory.class);
    private final Map<Tuple2<String, String>, ServiceBusProcessorClient> processorMap = new ConcurrentHashMap<Tuple2<String, String>, ServiceBusProcessorClient>();
    private final List<ServiceBusProcessorFactory.Listener> listeners = new ArrayList<ServiceBusProcessorFactory.Listener>();
    private final NamespaceProperties namespaceProperties;
    private final PropertiesSupplier<Tuple2<String, String>, ProcessorProperties> propertiesSupplier;
    private final ProcessorPropertiesParentMerger propertiesMerger = new ProcessorPropertiesParentMerger();
    public static final String INVALID_SUBSCRIPTION = DefaultServiceBusNamespaceProcessorFactory.class.getSimpleName() + "INVALID_SUBSCRIPTION";

    public DefaultServiceBusNamespaceProcessorFactory(NamespaceProperties namespaceProperties) {
        this(namespaceProperties, (PropertiesSupplier<Tuple2<String, String>, ProcessorProperties>)((PropertiesSupplier)key -> null));
    }

    public DefaultServiceBusNamespaceProcessorFactory(NamespaceProperties namespaceProperties, PropertiesSupplier<Tuple2<String, String>, ProcessorProperties> supplier) {
        this.namespaceProperties = namespaceProperties;
        this.propertiesSupplier = supplier == null ? key -> null : supplier;
    }

    private void close(Map<Tuple2<String, String>, ServiceBusProcessorClient> map, Consumer<ServiceBusProcessorClient> close) {
        map.forEach((t, p) -> {
            try {
                this.listeners.forEach(l -> l.processorRemoved((String)t.getT1(), (String)t.getT2(), (ServiceBusProcessorClient)p));
                close.accept((ServiceBusProcessorClient)p);
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to clean service bus queue client factory", (Throwable)ex);
            }
        });
    }

    public void destroy() {
        this.close(this.processorMap, ServiceBusProcessorClient::close);
        this.processorMap.clear();
        this.listeners.clear();
    }

    @Override
    public ServiceBusProcessorClient createProcessor(String queue, MessageProcessingListener messageProcessorListener) {
        return this.doCreateProcessor(queue, INVALID_SUBSCRIPTION, messageProcessorListener, (ProcessorProperties)((Object)this.propertiesSupplier.getProperties((Object)Tuples.of((Object)queue, (Object)INVALID_SUBSCRIPTION))));
    }

    @Override
    public ServiceBusProcessorClient createProcessor(String topic, String subscription, MessageProcessingListener messageProcessorListener) {
        return this.doCreateProcessor(topic, subscription, messageProcessorListener, (ProcessorProperties)((Object)this.propertiesSupplier.getProperties((Object)Tuples.of((Object)topic, (Object)subscription))));
    }

    private ServiceBusProcessorClient doCreateProcessor(String name, String subscription, MessageProcessingListener listener, @Nullable ProcessorProperties properties) {
        Tuple2 key = Tuples.of((Object)name, (Object)subscription);
        return this.processorMap.computeIfAbsent((Tuple2<String, String>)key, k -> {
            ServiceBusProcessorClient client;
            ProcessorProperties processorProperties = this.propertiesMerger.mergeParent(properties, this.namespaceProperties);
            processorProperties.setAutoComplete(false);
            processorProperties.setEntityName((String)k.getT1());
            if (INVALID_SUBSCRIPTION.equals(k.getT2())) {
                processorProperties.setEntityType(ServiceBusEntityType.QUEUE);
            } else {
                processorProperties.setEntityType(ServiceBusEntityType.TOPIC);
                processorProperties.setSubscriptionName((String)k.getT2());
            }
            if (Boolean.TRUE.equals(processorProperties.getSessionEnabled())) {
                ServiceBusSessionProcessorClientBuilderFactory factory = new ServiceBusSessionProcessorClientBuilderFactory((ServiceBusProcessorDescriptor)processorProperties, listener);
                factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS);
                client = ((ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder)factory.build()).buildProcessorClient();
            } else {
                ServiceBusProcessorClientBuilderFactory factory = new ServiceBusProcessorClientBuilderFactory((ServiceBusProcessorDescriptor)processorProperties, listener);
                factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS);
                client = ((ServiceBusClientBuilder.ServiceBusProcessorClientBuilder)factory.build()).buildProcessorClient();
            }
            this.listeners.forEach(l -> l.processorAdded((String)k.getT1(), INVALID_SUBSCRIPTION.equals(k.getT2()) ? null : (String)k.getT2(), client));
            return client;
        });
    }

    @Override
    public void addListener(ServiceBusProcessorFactory.Listener listener) {
        this.listeners.add(listener);
    }
}

