/*
 * Decompiled with CFR 0.152.
 */
package com.datasqrl.flinkrunner.stdlib.openai.utils;

import com.datasqrl.flinkrunner.stdlib.openai.utils.FunctionMetricTracker;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.table.functions.FunctionContext;

public class FunctionExecutor {
    private static final String POOL_SIZE = "ASYNC_FUNCTION_THREAD_POOL_SIZE";
    private final String functionName;
    private final FunctionMetricTracker metricTracker;
    private ExecutorService executorService;

    public FunctionExecutor(FunctionContext context, String functionName) {
        this.functionName = functionName;
        this.metricTracker = new FunctionMetricTracker(context, functionName);
    }

    public <T> T execute(Callable<T> func) {
        return this.execute(func, null);
    }

    public <T> T execute(Callable<T> func, @Nullable CompletableFuture<T> resultFuture) {
        this.initExecutorService();
        if (resultFuture == null) {
            return this.executeInternal(func);
        }
        this.executorService.submit(() -> {
            try {
                Object result = this.executeInternal(func);
                resultFuture.complete(result);
            }
            catch (RuntimeException ex) {
                resultFuture.completeExceptionally(ex);
            }
        });
        return null;
    }

    private <T> T executeInternal(Callable<T> func) {
        try {
            this.metricTracker.increaseCallCount();
            long start = System.nanoTime();
            T result = func.call();
            long elapsedTime = System.nanoTime() - start;
            this.metricTracker.recordLatency(TimeUnit.NANOSECONDS.toMillis(elapsedTime));
            return result;
        }
        catch (Exception ex) {
            this.metricTracker.increaseErrorCount();
            throw new RuntimeException("Failure occurred executing function: " + this.functionName, ex);
        }
    }

    private void initExecutorService() {
        if (this.executorService == null) {
            int poolSize = Integer.parseInt(System.getenv().getOrDefault(POOL_SIZE, "10"));
            this.executorService = Executors.newFixedThreadPool(poolSize);
        }
    }
}

