/*
 * Decompiled with CFR 0.152.
 */
package com.datasqrl.flinkrunner;

import com.datasqrl.flinkrunner.SqlUtils;
import com.datasqrl.flinkrunner.stdlib.utils.AutoRegisterSystemFunction;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SqlExecutor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SqlExecutor.class);
    private static final Pattern SET_PATTERN = Pattern.compile("SET\\s+'(\\S+)'\\s*=\\s*'(.+)';?", 2);
    private static final Pattern STATEMENT_SET_PATTERN = Pattern.compile("EXECUTE\\s+STATEMENT\\s+SET", 34);
    private final TableEnvironment tEnv;

    SqlExecutor(Configuration config, @Nullable String udfPath) {
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        EnvironmentSettings tEnvConfig = EnvironmentSettings.newInstance().withConfiguration(config).build();
        this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)sEnv, (EnvironmentSettings)tEnvConfig);
        if (StringUtils.isNotBlank((CharSequence)udfPath)) {
            this.setupUdfPath(udfPath);
        }
    }

    static SqlExecutor withUdfClassLoader(Configuration config, @Nullable String udfPath) throws IOException {
        URLClassLoader udfClassLoader = SqlExecutor.buildUdfClassLoader(udfPath);
        SqlExecutor.setConfigClassPaths(config, udfClassLoader);
        StreamExecutionEnvironment sEnv = new StreamExecutionEnvironment(config, (ClassLoader)udfClassLoader);
        EnvironmentSettings tEnvConfig = EnvironmentSettings.newInstance().withConfiguration(config).withClassLoader((ClassLoader)udfClassLoader).build();
        return new SqlExecutor((TableEnvironment)StreamTableEnvironment.create((StreamExecutionEnvironment)sEnv, (EnvironmentSettings)tEnvConfig));
    }

    void setupSystemFunctions() {
        log.debug("Setting up automatically registered system functions");
        try {
            ServiceLoader<AutoRegisterSystemFunction> standardLibraryFunctions = ServiceLoader.load(AutoRegisterSystemFunction.class);
            standardLibraryFunctions.forEach(function -> SqlExecutor.getFunctionNameAndClass(function.getClass()).ifPresent(funcParts -> this.tEnv.createTemporarySystemFunction((String)funcParts.f0, (Class)funcParts.f1)));
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
        log.debug("Completed auto function registered system functions");
    }

    TableResult executeScript(String script) {
        List<String> statements = SqlUtils.parseStatements(script);
        TableResult tableResult = null;
        Iterator<String> it = statements.iterator();
        while (it.hasNext()) {
            String statement = it.next();
            tableResult = this.executeStatement(statement, it.hasNext());
        }
        return tableResult;
    }

    TableResult executeCompiledPlan(String planJson) {
        log.info("Executing compiled plan from JSON.");
        try {
            PlanReference planReference = PlanReference.fromJsonString((String)planJson);
            TableResult result = this.tEnv.executePlan(planReference);
            log.info("Compiled plan executed.");
            return result;
        }
        catch (Exception e) {
            log.error("Failed to execute compiled plan", (Throwable)e);
            throw e;
        }
    }

    private TableResult executeStatement(String statement, boolean intermediate) {
        TableResult tableResult = null;
        try {
            Matcher setMatcher = SET_PATTERN.matcher(statement.trim());
            Matcher statementSetMatcher = STATEMENT_SET_PATTERN.matcher(statement.trim());
            if (setMatcher.matches()) {
                String key = setMatcher.group(1);
                String value = setMatcher.group(2);
                this.tEnv.getConfig().getConfiguration().setString(key, value);
                log.info("Set configuration: {} = {}", (Object)key, (Object)value);
            } else {
                log.info("Executing statement:\n{}", (Object)statement);
                tableResult = this.tEnv.executeSql(statement);
                if (statementSetMatcher.find() && intermediate) {
                    log.debug("Make sure to wait intermediate statement set to finish...");
                    tableResult.await();
                }
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Error while executing stmt: " + statement, e);
        }
        return tableResult;
    }

    private void setupUdfPath(String udfPath) {
        log.info("Setting up UDF path: {}", (Object)udfPath);
        try {
            File udfDir = new File(udfPath);
            if (udfDir.exists() && udfDir.isDirectory()) {
                File[] jarFiles = udfDir.listFiles((dir, name) -> name.endsWith(".jar"));
                if (jarFiles != null) {
                    for (File jarFile : jarFiles) {
                        this.tEnv.executeSql("ADD JAR 'file://" + jarFile.getAbsolutePath() + "'");
                        log.info("Added UDF JAR: {}", (Object)jarFile.getAbsolutePath());
                    }
                }
            } else {
                log.warn("UDF path does not exist or is not a directory: {}", (Object)udfPath);
            }
        }
        catch (Exception e) {
            log.error("Failed to set up UDF path", (Throwable)e);
        }
    }

    @Nullable
    static URLClassLoader buildUdfClassLoader(String udfPath) throws IOException {
        if (StringUtils.isBlank((CharSequence)udfPath)) {
            return null;
        }
        File udfDir = new File(udfPath);
        if (!udfDir.exists() || !udfDir.isDirectory()) {
            return null;
        }
        File[] jarFiles = udfDir.listFiles((dir, name) -> name.endsWith(".jar"));
        URL[] jarUrls = new URL[]{};
        if (jarFiles != null) {
            jarUrls = new URL[jarFiles.length];
            for (int i = 0; i < jarFiles.length; ++i) {
                jarUrls[i] = jarFiles[i].toURI().toURL();
            }
        }
        return jarUrls.length < 1 ? null : new URLClassLoader(jarUrls, Thread.currentThread().getContextClassLoader());
    }

    static void setConfigClassPaths(Configuration config, @Nullable URLClassLoader udfClassLoader) {
        if (udfClassLoader == null || udfClassLoader.getURLs().length < 1) {
            return;
        }
        List joinedUrls = Arrays.stream(udfClassLoader.getURLs()).map(URL::toString).collect(Collectors.toList());
        config.set(PipelineOptions.CLASSPATHS, joinedUrls);
    }

    @VisibleForTesting
    static Optional<Tuple2<String, Class<? extends UserDefinedFunction>>> getFunctionNameAndClass(Class<?> clazz) {
        Tuple2 res = null;
        if (UserDefinedFunction.class.isAssignableFrom(clazz)) {
            String funcName = clazz.getSimpleName().toLowerCase();
            res = Tuple2.of((Object)funcName, clazz);
            log.info("Registering '{}' ...", (Object)funcName);
        } else {
            log.warn("Skip registering '{}' as it does not extend '{}'", clazz, UserDefinedFunction.class);
        }
        return Optional.ofNullable(res);
    }

    @Generated
    SqlExecutor(TableEnvironment tEnv) {
        this.tEnv = tEnv;
    }
}

