/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.spring.integration.eventhub.factory;

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.spring.cloud.context.core.util.Memoizer;
import com.microsoft.azure.spring.cloud.context.core.util.Tuple;
import com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.microsoft.azure.spring.integration.eventhub.factory.EventHubConnectionStringProvider;
import com.microsoft.azure.spring.integration.eventhub.impl.EventHubRuntimeException;
import com.microsoft.azure.spring.integration.eventhub.util.HostnameHelper;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

public class DefaultEventHubClientFactory
implements EventHubClientFactory,
DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(DefaultEventHubClientFactory.class);
    private static final String PROJECT_VERSION = DefaultEventHubClientFactory.class.getPackage().getImplementationVersion();
    private static final String USER_AGENT = "spring-cloud-azure/" + PROJECT_VERSION;
    private final Map<String, EventHubClient> clientsByName = new ConcurrentHashMap<String, EventHubClient>();
    private final Map<Tuple<EventHubClient, String>, PartitionSender> partitionSenderMap = new ConcurrentHashMap<Tuple<EventHubClient, String>, PartitionSender>();
    private final Map<Tuple<String, String>, EventProcessorHost> processorHostMap = new ConcurrentHashMap<Tuple<String, String>, EventProcessorHost>();
    private final BiFunction<EventHubClient, String, PartitionSender> partitionSenderCreator = Memoizer.memoize(this.partitionSenderMap, this::createPartitionSender);
    private final String checkpointStorageConnectionString;
    private final EventHubConnectionStringProvider connectionStringProvider;
    private final Function<String, EventHubClient> eventHubClientCreator = Memoizer.memoize(this.clientsByName, this::createEventHubClient);
    private final BiFunction<String, String, EventProcessorHost> processorHostCreator = Memoizer.memoize(this.processorHostMap, this::createEventProcessorHost);

    public DefaultEventHubClientFactory(@NonNull EventHubConnectionStringProvider connectionStringProvider, String checkpointConnectionString) {
        Assert.hasText((String)checkpointConnectionString, (String)"checkpointConnectionString can't be null or empty");
        this.connectionStringProvider = connectionStringProvider;
        this.checkpointStorageConnectionString = checkpointConnectionString;
        EventHubClientImpl.USER_AGENT = USER_AGENT + "/" + EventHubClientImpl.USER_AGENT;
    }

    private EventHubClient createEventHubClient(String eventHubName) {
        try {
            return EventHubClient.createSync((String)this.connectionStringProvider.getConnectionString(eventHubName), (ScheduledExecutorService)Executors.newSingleThreadScheduledExecutor());
        }
        catch (EventHubException | IOException e) {
            throw new EventHubRuntimeException("Error when creating event hub client", e);
        }
    }

    private PartitionSender createPartitionSender(EventHubClient client, String partitionId) {
        try {
            return client.createPartitionSenderSync(partitionId);
        }
        catch (EventHubException e) {
            throw new EventHubRuntimeException("Error when creating event hub partition sender", e);
        }
    }

    private EventProcessorHost createEventProcessorHost(String name, String consumerGroup) {
        return new EventProcessorHost(EventProcessorHost.createHostName((String)HostnameHelper.getHostname()), name, consumerGroup, this.connectionStringProvider.getConnectionString(name), this.checkpointStorageConnectionString, name);
    }

    private <K, V> void close(Map<K, V> map, Function<V, CompletableFuture<Void>> close) {
        CompletableFuture.allOf((CompletableFuture[])map.values().stream().map(close).toArray(CompletableFuture[]::new)).exceptionally(ex -> {
            log.warn("Failed to clean event hub client factory", ex);
            return null;
        });
    }

    public void destroy() throws Exception {
        this.close(this.clientsByName, EventHubClient::close);
        this.close(this.partitionSenderMap, PartitionSender::close);
        this.close(this.processorHostMap, EventProcessorHost::unregisterEventProcessor);
    }

    @Override
    public EventHubClient getOrCreateClient(String name) {
        return this.eventHubClientCreator.apply(name);
    }

    @Override
    public PartitionSender getOrCreatePartitionSender(String eventhub, String partition) {
        return this.partitionSenderCreator.apply(this.getOrCreateClient(eventhub), partition);
    }

    @Override
    public EventProcessorHost getOrCreateEventProcessorHost(String name, String consumerGroup) {
        return this.processorHostCreator.apply(name, consumerGroup);
    }

    @Override
    public Optional<EventProcessorHost> getEventProcessorHost(String name, String consumerGroup) {
        return Optional.ofNullable(this.processorHostMap.get(Tuple.of((Object)name, (Object)consumerGroup)));
    }

    @Override
    public EventProcessorHost removeEventProcessorHost(String name, String consumerGroup) {
        return this.processorHostMap.remove(Tuple.of((Object)name, (Object)consumerGroup));
    }
}

