/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.integration.eventhub.factory;

import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.util.ClientOptions;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.spring.cloud.context.core.util.Constants;
import com.azure.spring.cloud.context.core.util.Memoizer;
import com.azure.spring.cloud.context.core.util.Tuple;
import com.azure.spring.integration.core.api.BatchConsumerConfig;
import com.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.azure.spring.integration.eventhub.impl.EventHubProcessor;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

public class DefaultEventHubClientFactory
implements EventHubClientFactory,
DisposableBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventHubClientFactory.class);
    private final Map<Tuple<String, String>, EventHubConsumerAsyncClient> consumerClientMap = new ConcurrentHashMap<Tuple<String, String>, EventHubConsumerAsyncClient>();
    private final Map<String, EventHubProducerAsyncClient> producerClientMap = new ConcurrentHashMap<String, EventHubProducerAsyncClient>();
    private final Map<Tuple<String, String>, EventProcessorClient> processorClientMap = new ConcurrentHashMap<Tuple<String, String>, EventProcessorClient>();
    private final String checkpointStorageConnectionString;
    private final String checkpointStorageContainer;
    private final String eventHubConnectionString;
    private final BiFunction<String, String, EventHubConsumerAsyncClient> eventHubConsumerClientCreator = Memoizer.memoize(this.consumerClientMap, this::createEventHubClient);
    private final Function<String, EventHubProducerAsyncClient> producerClientCreator = Memoizer.memoize(this.producerClientMap, this::createProducerClient);

    public DefaultEventHubClientFactory(@NonNull String eventHubConnectionString, String checkpointConnectionString, String checkpointStorageContainer) {
        this.eventHubConnectionString = eventHubConnectionString;
        this.checkpointStorageConnectionString = checkpointConnectionString;
        this.checkpointStorageContainer = checkpointStorageContainer;
    }

    private EventHubConsumerAsyncClient createEventHubClient(String eventHubName, String consumerGroup) {
        return new EventHubClientBuilder().connectionString(this.eventHubConnectionString, eventHubName).consumerGroup(consumerGroup).clientOptions(new ClientOptions().setApplicationId(Constants.SPRING_EVENT_HUB_APPLICATION_ID)).buildAsyncConsumerClient();
    }

    private EventHubProducerAsyncClient createProducerClient(String eventHubName) {
        return new EventHubClientBuilder().connectionString(this.eventHubConnectionString, eventHubName).clientOptions(new ClientOptions().setApplicationId(Constants.SPRING_EVENT_HUB_APPLICATION_ID)).buildAsyncProducerClient();
    }

    private EventProcessorClient createEventProcessorClientInternal(String eventHubName, String consumerGroup, EventHubProcessor eventHubProcessor, BatchConsumerConfig batchConsumerConfig) {
        Assert.hasText((String)this.checkpointStorageConnectionString, (String)"checkpointConnectionString can't be null or empty, check whether checkpoint-storage-account is configured in the configuration file.");
        String containerName = this.checkpointStorageContainer == null ? eventHubName : this.checkpointStorageContainer;
        BlobContainerAsyncClient blobClient = new BlobContainerClientBuilder().connectionString(this.checkpointStorageConnectionString).containerName(containerName).httpLogOptions(new HttpLogOptions().setApplicationId(Constants.SPRING_EVENT_HUB_APPLICATION_ID)).buildAsyncClient();
        Boolean isContainerExist = (Boolean)blobClient.exists().block();
        if (isContainerExist == null || !isContainerExist.booleanValue()) {
            LOGGER.warn("Will create storage blob {}, the auto creation might be deprecated in later versions.", (Object)containerName);
            blobClient.create().block(Duration.ofMinutes(5L));
        }
        if (batchConsumerConfig != null) {
            return new EventProcessorClientBuilder().connectionString(this.eventHubConnectionString, eventHubName).consumerGroup(consumerGroup).checkpointStore((CheckpointStore)new BlobCheckpointStore(blobClient)).processPartitionInitialization(eventHubProcessor::onInitialize).processPartitionClose(eventHubProcessor::onClose).processEventBatch(eventHubProcessor::onEventBatch, batchConsumerConfig.getMaxBatchSize(), batchConsumerConfig.getMaxWaitTime()).processError(eventHubProcessor::onError).buildEventProcessorClient();
        }
        return new EventProcessorClientBuilder().connectionString(this.eventHubConnectionString, eventHubName).consumerGroup(consumerGroup).checkpointStore((CheckpointStore)new BlobCheckpointStore(blobClient)).processPartitionInitialization(eventHubProcessor::onInitialize).processPartitionClose(eventHubProcessor::onClose).processEvent(eventHubProcessor::onEvent).processError(eventHubProcessor::onError).buildEventProcessorClient();
    }

    private <K, V> void close(Map<K, V> map, Consumer<V> close) {
        map.values().forEach(it -> {
            try {
                close.accept(it);
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to clean event hub client factory", (Throwable)ex);
            }
        });
    }

    public void destroy() {
        this.close(this.consumerClientMap, EventHubConsumerAsyncClient::close);
        this.close(this.producerClientMap, EventHubProducerAsyncClient::close);
        this.close(this.processorClientMap, EventProcessorClient::stop);
    }

    @Override
    public EventHubConsumerAsyncClient getOrCreateConsumerClient(String eventHubName, String consumerGroup) {
        return this.eventHubConsumerClientCreator.apply(eventHubName, consumerGroup);
    }

    @Override
    public EventHubProducerAsyncClient getOrCreateProducerClient(String eventHubName) {
        return this.producerClientCreator.apply(eventHubName);
    }

    @Override
    public EventProcessorClient createEventProcessorClient(String eventHubName, String consumerGroup, EventHubProcessor processor, BatchConsumerConfig batchConsumerConfig) {
        return this.processorClientMap.computeIfAbsent((Tuple<String, String>)Tuple.of((Object)eventHubName, (Object)consumerGroup), t -> this.createEventProcessorClientInternal(eventHubName, consumerGroup, processor, batchConsumerConfig));
    }

    @Override
    public Optional<EventProcessorClient> getEventProcessorClient(String eventHubName, String consumerGroup) {
        return Optional.ofNullable(this.processorClientMap.get(Tuple.of((Object)eventHubName, (Object)consumerGroup)));
    }

    @Override
    public EventProcessorClient removeEventProcessorClient(String eventHubName, String consumerGroup) {
        return this.processorClientMap.remove(Tuple.of((Object)eventHubName, (Object)consumerGroup));
    }
}

