/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.thread;

import java.util.Optional;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.io.netty.util.internal.PlatformDependent;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadRuntimeFactory
implements RuntimeFactory {
    private static final Logger log = LoggerFactory.getLogger(ThreadRuntimeFactory.class);
    private ThreadGroup threadGroup;
    private FunctionCacheManager fnCache;
    private ClientBuilder clientBuilder;
    private PulsarClient pulsarClient;
    private PulsarAdmin pulsarAdmin;
    private String storageServiceUrl;
    private SecretsProvider defaultSecretsProvider;
    private FunctionCollectorRegistry collectorRegistry;
    private String narExtractionDirectory;
    private volatile boolean closed;
    private SecretsProviderConfigurator secretsProviderConfigurator;
    private ClassLoader rootClassLoader;
    private Optional<ConnectorsManager> connectorsManager;

    public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl, AuthenticationConfig authConfig, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, String pulsarWebServiceUrl) throws Exception {
        this.initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig, storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty());
    }

    private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, String pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl, SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, String pulsarWebServiceUrl, Optional<ConnectorsManager> connectorsManager) throws PulsarClientException {
        if (rootClassLoader == null) {
            rootClassLoader = Thread.currentThread().getContextClassLoader();
        }
        this.rootClassLoader = rootClassLoader;
        this.secretsProviderConfigurator = secretsProviderConfigurator;
        this.defaultSecretsProvider = secretsProvider;
        this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
        this.threadGroup = new ThreadGroup(threadGroupName);
        this.pulsarAdmin = exposePulsarAdminClientEnabled ? InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig) : null;
        this.clientBuilder = InstanceUtils.createPulsarClientBuilder(pulsarServiceUrl, authConfig, this.calculateClientMemoryLimit(memoryLimit));
        this.pulsarClient = this.clientBuilder.build();
        this.storageServiceUrl = storageServiceUrl;
        this.collectorRegistry = collectorRegistry;
        this.narExtractionDirectory = narExtractionDirectory;
        this.connectorsManager = connectorsManager;
    }

    private Optional<Long> calculateClientMemoryLimit(Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit) {
        if (memoryLimit.isPresent()) {
            Long absolute = memoryLimit.get().getAbsoluteValue();
            Double percentOfDirectMem = memoryLimit.get().getPercentOfMaxDirectMemory();
            if (absolute != null) {
                Preconditions.checkArgument(absolute > 0L, "Absolute memory limit for Pulsar client has to be positive");
            }
            if (percentOfDirectMem != null) {
                Preconditions.checkArgument(percentOfDirectMem > 0.0 && percentOfDirectMem <= 100.0, "Percent of max direct memory limit for Pulsar client must be between 0 and 100");
            }
            if (absolute != null && percentOfDirectMem != null) {
                return Optional.of(Math.min(absolute, this.getBytesPercentDirectMem(percentOfDirectMem)));
            }
            if (absolute != null) {
                return Optional.of(absolute);
            }
            if (percentOfDirectMem != null) {
                return Optional.of(this.getBytesPercentDirectMem(percentOfDirectMem));
            }
        }
        return Optional.empty();
    }

    private long getBytesPercentDirectMem(double percent) {
        return (long)((double)PlatformDependent.maxDirectMemory() * (percent / 100.0));
    }

    @Override
    public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig, SecretsProviderConfigurator secretsProviderConfigurator, ConnectorsManager connectorsManager, Optional<FunctionAuthProvider> functionAuthProvider, Optional<RuntimeCustomizer> runtimeCustomizer) throws Exception {
        ThreadRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig(workerConfig.getFunctionRuntimeFactoryConfigs(), ThreadRuntimeFactoryConfig.class);
        this.initialize(factoryConfig.getThreadGroupName(), Optional.ofNullable(factoryConfig.getPulsarClientMemoryLimit()), workerConfig.getPulsarServiceUrl(), authenticationConfig, workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null, null, workerConfig.getNarExtractionDirectory(), null, workerConfig.isExposeAdminClientEnabled(), workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager));
    }

    @Override
    public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile, String originalCodeFileName, Long expectedHealthCheckInterval) {
        SecretsProvider secretsProvider = this.defaultSecretsProvider;
        if (secretsProvider == null) {
            String secretsProviderClassName = this.secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
            secretsProvider = (SecretsProvider)Reflections.createInstance(secretsProviderClassName, this.rootClassLoader);
            log.info("Initializing secrets provider {} with configs: {}", (Object)secretsProvider.getClass().getName(), this.secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
            secretsProvider.init(this.secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
        }
        return new ThreadRuntime(instanceConfig, this.fnCache, this.threadGroup, jarFile, this.pulsarClient, this.clientBuilder, this.pulsarAdmin, this.storageServiceUrl, secretsProvider, this.collectorRegistry, this.narExtractionDirectory, this.connectorsManager);
    }

    @Override
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.threadGroup.interrupt();
        this.fnCache.close();
        try {
            this.pulsarClient.close();
        }
        catch (PulsarClientException e) {
            log.warn("Failed to close pulsar client when closing function container factory", (Throwable)e);
        }
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
        InstanceCache.shutdown();
    }

    public ThreadRuntimeFactory() {
    }

    public ThreadGroup getThreadGroup() {
        return this.threadGroup;
    }
}

