/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark3.agent.lifecycle.plan.catalog;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.agent.util.SparkConfUtils;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler;
import io.openlineage.spark3.agent.lifecycle.plan.catalog.UnsupportedCatalogException;
import java.io.File;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergHandler
implements CatalogHandler {
    private static final Logger log = LoggerFactory.getLogger(IcebergHandler.class);
    private final OpenLineageContext context;
    private static final String TYPE = "type";
    private static final String CATALOG_IMPL = "catalog-impl";

    public IcebergHandler(OpenLineageContext context) {
        this.context = context;
    }

    @Override
    public boolean hasClasses() {
        try {
            IcebergHandler.class.getClassLoader().loadClass("org.apache.iceberg.catalog.Catalog");
            return true;
        }
        catch (Exception exception) {
            return false;
        }
    }

    @Override
    public boolean isClass(TableCatalog tableCatalog) {
        return tableCatalog instanceof SparkCatalog || tableCatalog instanceof SparkSessionCatalog;
    }

    @Override
    public DatasetIdentifier getDatasetIdentifier(SparkSession session, TableCatalog tableCatalog, Identifier identifier, Map<String, String> properties) {
        String catalogName = tableCatalog.name();
        String prefix = String.format("spark.sql.catalog.%s", catalogName);
        Map conf = ScalaConversionUtils.fromMap(session.conf().getAll());
        log.debug(conf.toString());
        Map<String, String> catalogConf = conf.entrySet().stream().filter(x -> ((String)x.getKey()).startsWith(prefix)).filter(x -> ((String)x.getKey()).length() > prefix.length()).collect(Collectors.toMap(x -> ((String)x.getKey()).substring(prefix.length() + 1), Map.Entry::getValue));
        log.debug(catalogConf.toString());
        String catalogType = this.getCatalogType(catalogConf);
        if (catalogType == null) {
            throw new UnsupportedCatalogException(catalogName);
        }
        log.debug(catalogConf.get(TYPE));
        String warehouse = catalogConf.get("warehouse");
        DatasetIdentifier di = PathUtils.fromPath(new Path(warehouse, identifier.toString()));
        if ("hive".equals(catalogType)) {
            di.withSymlink(this.getHiveIdentifier(session, catalogConf.get("uri"), identifier.toString()));
        } else if ("hadoop".equals(catalogType)) {
            di.withSymlink(identifier.toString(), StringUtils.substringBeforeLast((String)di.getName(), (String)File.separator), DatasetIdentifier.SymlinkType.TABLE);
        } else if ("rest".equals(catalogType)) {
            di.withSymlink(this.getRestIdentifier(catalogConf.get("uri"), identifier.toString()));
        } else if ("nessie".equals(catalogType)) {
            di.withSymlink(this.getNessieIdentifier(catalogConf.get("uri"), identifier.toString()));
        } else if ("glue".equals(catalogType)) {
            di.withSymlink(identifier.toString(), StringUtils.substringBeforeLast((String)di.getName(), (String)File.separator), DatasetIdentifier.SymlinkType.TABLE);
        }
        return di;
    }

    private DatasetIdentifier.Symlink getNessieIdentifier(@Nullable String confUri, String table) {
        String uri = new URI(confUri).toString();
        return new DatasetIdentifier.Symlink(table, uri, DatasetIdentifier.SymlinkType.TABLE);
    }

    private DatasetIdentifier.Symlink getHiveIdentifier(SparkSession session, @Nullable String confUri, String table) {
        String slashPrefixedTable = String.format("/%s", table);
        URI uri = confUri == null ? SparkConfUtils.getMetastoreUri(session.sparkContext().conf()).orElseThrow(() -> new UnsupportedCatalogException("hive")) : new URI(confUri);
        DatasetIdentifier metastoreIdentifier = PathUtils.fromPath(new Path(PathUtils.enrichHiveMetastoreURIWithTableName(uri, slashPrefixedTable)));
        return new DatasetIdentifier.Symlink(metastoreIdentifier.getName(), metastoreIdentifier.getNamespace(), DatasetIdentifier.SymlinkType.TABLE);
    }

    private DatasetIdentifier.Symlink getRestIdentifier(@Nullable String confUri, String table) {
        String uri = new URI(confUri).toString();
        return new DatasetIdentifier.Symlink(table, uri, DatasetIdentifier.SymlinkType.TABLE);
    }

    @Override
    public Optional<OpenLineage.StorageDatasetFacet> getStorageDatasetFacet(Map<String, String> properties) {
        String format = properties.getOrDefault("format", "");
        return Optional.of(this.context.getOpenLineage().newStorageDatasetFacet("iceberg", format.replace("iceberg/", "")));
    }

    @Override
    public Optional<String> getDatasetVersion(TableCatalog tableCatalog, Identifier identifier, Map<String, String> properties) {
        SparkTable table;
        try {
            table = (SparkTable)tableCatalog.loadTable(identifier);
        }
        catch (ClassCastException | NoSuchTableException e) {
            log.error("Failed to load table from catalog: {}", (Object)identifier, (Object)e);
            return Optional.empty();
        }
        if (table.table() != null && table.table().currentSnapshot() != null) {
            return Optional.of(Long.toString(table.table().currentSnapshot().snapshotId()));
        }
        return Optional.empty();
    }

    @Override
    public String getName() {
        return "iceberg";
    }

    private String getCatalogType(Map<String, String> catalogConf) {
        if (catalogConf.containsKey(TYPE)) {
            return catalogConf.get(TYPE);
        }
        if (catalogConf.containsKey(CATALOG_IMPL) && catalogConf.get(CATALOG_IMPL).endsWith("GlueCatalog")) {
            return "glue";
        }
        return null;
    }
}

