/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.eventhubs.core.processor;

import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.spring.core.AzureSpringIdentifier;
import com.azure.spring.eventhubs.core.processor.EventHubsProcessorFactory;
import com.azure.spring.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.eventhubs.core.properties.ProcessorProperties;
import com.azure.spring.eventhubs.core.properties.merger.ProcessorPropertiesParentMerger;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.service.eventhubs.factory.EventProcessorClientBuilderFactory;
import com.azure.spring.service.eventhubs.processor.EventProcessingListener;
import com.azure.spring.service.eventhubs.properties.EventHubsProcessorDescriptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class DefaultEventHubsNamespaceProcessorFactory
implements EventHubsProcessorFactory,
DisposableBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventHubsNamespaceProcessorFactory.class);
    private final List<EventHubsProcessorFactory.Listener> listeners = new ArrayList<EventHubsProcessorFactory.Listener>();
    private final NamespaceProperties namespaceProperties;
    private final CheckpointStore checkpointStore;
    private final PropertiesSupplier<Tuple2<String, String>, ProcessorProperties> propertiesSupplier;
    private final Map<Tuple2<String, String>, EventProcessorClient> processorClientMap = new ConcurrentHashMap<Tuple2<String, String>, EventProcessorClient>();
    private final ProcessorPropertiesParentMerger propertiesMerger = new ProcessorPropertiesParentMerger();

    public DefaultEventHubsNamespaceProcessorFactory(CheckpointStore checkpointStore) {
        this(checkpointStore, null, null);
    }

    public DefaultEventHubsNamespaceProcessorFactory(CheckpointStore checkpointStore, NamespaceProperties namespaceProperties) {
        this(checkpointStore, namespaceProperties, (PropertiesSupplier<Tuple2<String, String>, ProcessorProperties>)((PropertiesSupplier)key -> null));
    }

    public DefaultEventHubsNamespaceProcessorFactory(CheckpointStore checkpointStore, PropertiesSupplier<Tuple2<String, String>, ProcessorProperties> supplier) {
        this(checkpointStore, null, supplier);
    }

    public DefaultEventHubsNamespaceProcessorFactory(CheckpointStore checkpointStore, NamespaceProperties namespaceProperties, PropertiesSupplier<Tuple2<String, String>, ProcessorProperties> supplier) {
        Assert.notNull((Object)checkpointStore, (String)"CheckpointStore must be provided.");
        this.checkpointStore = checkpointStore;
        this.namespaceProperties = namespaceProperties;
        this.propertiesSupplier = supplier == null ? key -> null : supplier;
    }

    @Override
    public EventProcessorClient createProcessor(@NonNull String eventHub, @NonNull String consumerGroup, @NonNull EventProcessingListener listener) {
        return this.doCreateProcessor(eventHub, consumerGroup, listener, (ProcessorProperties)((Object)this.propertiesSupplier.getProperties((Object)Tuples.of((Object)eventHub, (Object)consumerGroup))));
    }

    public void destroy() {
        this.processorClientMap.forEach((t, client) -> {
            this.listeners.forEach(l -> l.processorRemoved((String)t.getT1(), (String)t.getT2(), (EventProcessorClient)client));
            client.stop();
        });
        this.processorClientMap.clear();
        this.listeners.clear();
    }

    private EventProcessorClient doCreateProcessor(@NonNull String eventHub, @NonNull String consumerGroup, @NonNull EventProcessingListener listener, @Nullable ProcessorProperties properties) {
        Tuple2 key = Tuples.of((Object)eventHub, (Object)consumerGroup);
        return this.processorClientMap.computeIfAbsent((Tuple2<String, String>)key, k -> {
            ProcessorProperties processorProperties = this.propertiesMerger.mergeParent(properties, this.namespaceProperties);
            processorProperties.setEventHubName((String)k.getT1());
            processorProperties.setConsumerGroup((String)k.getT2());
            EventProcessorClientBuilderFactory factory = new EventProcessorClientBuilderFactory((EventHubsProcessorDescriptor)processorProperties, this.checkpointStore, listener);
            factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
            EventProcessorClient client = ((EventProcessorClientBuilder)factory.build()).buildEventProcessorClient();
            LOGGER.info("EventProcessor created for event hub '{}' with consumer group '{}'", k.getT1(), k.getT2());
            this.listeners.forEach(l -> l.processorAdded((String)k.getT1(), (String)k.getT2(), client));
            return client;
        });
    }

    @Override
    public void addListener(EventHubsProcessorFactory.Listener listener) {
        this.listeners.add(listener);
    }

    @Override
    public boolean removeListener(EventHubsProcessorFactory.Listener listener) {
        return this.listeners.remove(listener);
    }
}

