/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent.util;

import com.google.common.base.CharMatcher;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.client.utils.JdbcUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.sql.ColumnLineage;
import io.openlineage.sql.ColumnMeta;
import io.openlineage.sql.DbTableMeta;
import io.openlineage.sql.ExtractionError;
import io.openlineage.sql.OpenLineageSql;
import io.openlineage.sql.SqlMeta;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSparkUtils {
    private static final Logger log = LoggerFactory.getLogger(JdbcSparkUtils.class);

    public static DatasetIdentifier getDatasetIdentifierFromJdbcUrl(String jdbcUrl, List<String> parts) {
        String namespace = JdbcUtils.sanitizeJdbcUrl(jdbcUrl);
        String urlDatabase = null;
        try {
            URI uri = new URI(namespace);
            String path = uri.getPath();
            if (path != null) {
                namespace = String.format("%s://%s", uri.getScheme(), uri.getAuthority());
                if (path.startsWith("/")) {
                    path = path.substring(1);
                }
                if (path.length() > 1 && CharMatcher.forPredicate(Character::isAlphabetic).matchesAllOf((CharSequence)path)) {
                    urlDatabase = path;
                }
            }
        }
        catch (URISyntaxException uri) {
            // empty catch block
        }
        if (urlDatabase != null && parts.size() <= 3) {
            parts.add(0, urlDatabase);
        }
        String name = String.join((CharSequence)".", parts);
        return new DatasetIdentifier(name, namespace);
    }

    public static <D extends OpenLineage.Dataset> List<D> getDatasets(DatasetFactory<D> datasetFactory, SqlMeta meta, StructType schema, String url) {
        if (meta.columnLineage().isEmpty()) {
            DatasetIdentifier di = JdbcUtils.getDatasetIdentifierFromJdbcUrl(url, meta.inTables().get(0).qualifiedName());
            return Collections.singletonList(datasetFactory.getDataset(di.getName(), di.getNamespace(), schema));
        }
        return meta.inTables().stream().map(dbtm -> {
            DatasetIdentifier di = JdbcUtils.getDatasetIdentifierFromJdbcUrl(url, dbtm.qualifiedName());
            return datasetFactory.getDataset(di.getName(), di.getNamespace(), JdbcSparkUtils.generateJDBCSchema(dbtm, schema, meta));
        }).collect(Collectors.toList());
    }

    private static StructType generateJDBCSchema(DbTableMeta origin, StructType schema, SqlMeta sqlMeta) {
        StructType originSchema = new StructType();
        for (StructField f : schema.fields()) {
            List fields = sqlMeta.columnLineage().stream().filter(cl -> cl.descendant().name().equals(f.name())).flatMap(cl -> cl.lineage().stream().filter(cm -> cm.origin().isPresent() && cm.origin().get().equals(origin))).collect(Collectors.toList());
            for (ColumnMeta cm : fields) {
                originSchema = originSchema.add(cm.name(), f.dataType());
            }
        }
        return originSchema;
    }

    public static Optional<SqlMeta> extractQueryFromSpark(JDBCRelation relation) {
        Optional table = ScalaConversionUtils.asJavaOptional(relation.jdbcOptions().parameters().get(JDBCOptions$.MODULE$.JDBC_TABLE_NAME()));
        if (table.isPresent() && !((String)table.get()).startsWith("(")) {
            DbTableMeta origin = new DbTableMeta(null, null, (String)table.get());
            return Optional.of(new SqlMeta(Collections.singletonList(origin), Collections.emptyList(), Arrays.stream(relation.schema().fields()).map(field -> new ColumnLineage(new ColumnMeta(null, field.name()), Collections.singletonList(new ColumnMeta(origin, field.name())))).collect(Collectors.toList()), Collections.emptyList()));
        }
        String tableOrQuery = relation.jdbcOptions().tableOrQuery();
        String query = tableOrQuery.substring(0, tableOrQuery.lastIndexOf(")")).replaceFirst("\\(", "");
        String dialect = JdbcSparkUtils.extractDialectFromJdbcUrl(relation.jdbcOptions().url());
        Optional<SqlMeta> sqlMeta = OpenLineageSql.parse(Collections.singletonList(query), dialect);
        if (!sqlMeta.isPresent()) {
            return sqlMeta;
        }
        if (!sqlMeta.get().errors().isEmpty()) {
            log.error(String.format("error while parsing query: %s", sqlMeta.get().errors().stream().map(ExtractionError::toString).collect(Collectors.joining(","))));
            return Optional.empty();
        }
        if (sqlMeta.get().inTables().isEmpty()) {
            log.error("no tables defined in query, this should not happen");
            return Optional.empty();
        }
        return sqlMeta;
    }

    private static String extractDialectFromJdbcUrl(String jdbcUrl) {
        Pattern pattern = Pattern.compile("^jdbc:([^:]+):.*");
        Matcher matcher = pattern.matcher(jdbcUrl);
        if (matcher.find()) {
            return matcher.group(1);
        }
        return null;
    }
}

