/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.thread;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.nar.FileUtils;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadRuntime
implements Runtime {
    private static final Logger log = LoggerFactory.getLogger(ThreadRuntime.class);
    private Thread fnThread;
    private static final int THREAD_SHUTDOWN_TIMEOUT_MILLIS = 10000;
    private final InstanceConfig instanceConfig;
    private JavaInstanceRunnable javaInstanceRunnable;
    private final ThreadGroup threadGroup;
    private final FunctionCacheManager fnCache;
    private final String jarFile;
    private final ClientBuilder clientBuilder;
    private final PulsarClient pulsarClient;
    private final PulsarAdmin pulsarAdmin;
    private final String stateStorageImplClass;
    private final String stateStorageServiceUrl;
    private final SecretsProvider secretsProvider;
    private final FunctionCollectorRegistry collectorRegistry;
    private final String narExtractionDirectory;
    private final Optional<ConnectorsManager> connectorsManager;

    ThreadRuntime(InstanceConfig instanceConfig, FunctionCacheManager fnCache, ThreadGroup threadGroup, String jarFile, PulsarClient client, ClientBuilder clientBuilder, PulsarAdmin pulsarAdmin, String stateStorageImplClass, String stateStorageServiceUrl, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory, Optional<ConnectorsManager> connectorsManager) {
        this.instanceConfig = instanceConfig;
        if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) {
            throw new RuntimeException("Thread Container only supports Java Runtime");
        }
        this.threadGroup = threadGroup;
        this.fnCache = fnCache;
        this.jarFile = jarFile;
        this.clientBuilder = clientBuilder;
        this.pulsarClient = client;
        this.pulsarAdmin = pulsarAdmin;
        this.stateStorageImplClass = stateStorageImplClass;
        this.stateStorageServiceUrl = stateStorageServiceUrl;
        this.secretsProvider = secretsProvider;
        this.collectorRegistry = collectorRegistry;
        this.narExtractionDirectory = narExtractionDirectory;
        this.connectorsManager = connectorsManager;
    }

    private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig, String jarFile, String narExtractionDirectory, FunctionCacheManager fnCache, Optional<ConnectorsManager> connectorsManager) throws Exception {
        if (FunctionCommon.isFunctionCodeBuiltin((Function.FunctionDetailsOrBuilder)instanceConfig.getFunctionDetails()) && connectorsManager.isPresent()) {
            switch (InstanceUtils.calculateSubjectType((Function.FunctionDetails)instanceConfig.getFunctionDetails())) {
                case SOURCE: {
                    return connectorsManager.get().getConnector(instanceConfig.getFunctionDetails().getSource().getBuiltin()).getClassLoader();
                }
                case SINK: {
                    return connectorsManager.get().getConnector(instanceConfig.getFunctionDetails().getSink().getBuiltin()).getClassLoader();
                }
            }
            return ThreadRuntime.loadJars(jarFile, instanceConfig, narExtractionDirectory, fnCache);
        }
        return ThreadRuntime.loadJars(jarFile, instanceConfig, narExtractionDirectory, fnCache);
    }

    private static ClassLoader loadJars(String jarFile, InstanceConfig instanceConfig, String narExtractionDirectory, FunctionCacheManager fnCache) throws Exception {
        if (jarFile == null) {
            return Thread.currentThread().getContextClassLoader();
        }
        boolean loadedAsNar = false;
        if (FileUtils.mayBeANarArchive((File)new File(jarFile))) {
            try {
                log.info("Trying Loading file as NAR file: {}", (Object)jarFile);
                fnCache.registerFunctionInstanceWithArchive(instanceConfig.getFunctionId(), instanceConfig.getInstanceName(), jarFile, narExtractionDirectory);
                loadedAsNar = true;
            }
            catch (FileNotFoundException e) {
                log.error("The file {} does not look like a .nar file {}", (Object)jarFile, (Object)e.toString());
            }
        }
        if (!loadedAsNar) {
            log.info("Load file as simple JAR file: {}", (Object)jarFile);
            fnCache.registerFunctionInstance(instanceConfig.getFunctionId(), instanceConfig.getInstanceName(), Arrays.asList(jarFile), Collections.emptyList());
        }
        log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}", (Object)instanceConfig.getFunctionDetails().getName(), (Object)fnCache.getClassLoader(instanceConfig.getFunctionId()));
        ClassLoader fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
        if (null == fnClassLoader) {
            throw new Exception("No function class loader available.");
        }
        return fnClassLoader;
    }

    @Override
    public void start() throws Exception {
        ClassLoader functionClassLoader = ThreadRuntime.getFunctionClassLoader(this.instanceConfig, this.jarFile, this.narExtractionDirectory, this.fnCache, this.connectorsManager);
        this.javaInstanceRunnable = new JavaInstanceRunnable(this.instanceConfig, this.clientBuilder, this.pulsarClient, this.pulsarAdmin, this.stateStorageImplClass, this.stateStorageServiceUrl, this.secretsProvider, this.collectorRegistry, functionClassLoader);
        log.info("ThreadContainer starting function with instanceId {} functionId {} namespace {}", new Object[]{this.instanceConfig.getInstanceId(), this.instanceConfig.getFunctionId(), this.instanceConfig.getFunctionDetails().getNamespace()});
        this.fnThread = new Thread(this.threadGroup, (Runnable)this.javaInstanceRunnable, String.format("%s-%s", FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)this.instanceConfig.getFunctionDetails()), this.instanceConfig.getInstanceId()));
        this.fnThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.error("Uncaught exception in thread {}", (Object)t, (Object)e);
            }
        });
        this.fnThread.start();
    }

    @Override
    public void join() throws Exception {
        if (this.fnThread != null) {
            this.fnThread.join();
        }
    }

    @Override
    public void stop() {
        if (this.fnThread != null) {
            this.fnThread.interrupt();
            try {
                this.fnThread.join(10000L, 0);
                if (this.fnThread.isAlive()) {
                    log.warn("The function instance thread is still alive after {} milliseconds. Giving up waiting and moving forward to close function.", (Object)10000);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.javaInstanceRunnable.close();
            log.info("Unloading JAR files for instanceId {} functionId {} namespace {}", new Object[]{this.instanceConfig.getInstanceId(), this.instanceConfig.getFunctionId(), this.instanceConfig.getFunctionDetails().getNamespace()});
            this.fnCache.unregisterFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName());
        }
    }

    @Override
    public CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int instanceId) {
        CompletableFuture<InstanceCommunication.FunctionStatus> statsFuture = new CompletableFuture<InstanceCommunication.FunctionStatus>();
        if (!this.isAlive()) {
            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            functionStatusBuilder.setRunning(false);
            Throwable ex = this.getDeathException();
            if (ex != null && ex.getMessage() != null) {
                functionStatusBuilder.setFailureException(ex.getMessage());
            }
            statsFuture.complete(functionStatusBuilder.build());
            return statsFuture;
        }
        InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = this.javaInstanceRunnable.getFunctionStatus();
        functionStatusBuilder.setRunning(true);
        statsFuture.complete(functionStatusBuilder.build());
        return statsFuture;
    }

    @Override
    public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
        return CompletableFuture.completedFuture(this.javaInstanceRunnable.getAndResetMetrics());
    }

    @Override
    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
        return CompletableFuture.completedFuture(this.javaInstanceRunnable.getMetrics());
    }

    @Override
    public String getPrometheusMetrics() throws IOException {
        if (this.javaInstanceRunnable == null) {
            throw new PulsarServerException("javaInstanceRunnable is not initialized");
        }
        return this.javaInstanceRunnable.getStatsAsString();
    }

    @Override
    public CompletableFuture<Void> resetMetrics() {
        this.javaInstanceRunnable.resetMetrics();
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public boolean isAlive() {
        if (this.fnThread != null) {
            return this.fnThread.isAlive();
        }
        return false;
    }

    @Override
    public Throwable getDeathException() {
        if (this.isAlive()) {
            return null;
        }
        if (null != this.javaInstanceRunnable) {
            return this.javaInstanceRunnable.getDeathException();
        }
        return null;
    }

    public InstanceConfig getInstanceConfig() {
        return this.instanceConfig;
    }
}

