/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.spark;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.spark.SparkCatalogOptions;
import org.apache.paimon.spark.SparkTable;
import org.apache.paimon.spark.SparkTypeUtils;
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
import org.apache.paimon.utils.Preconditions;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkCatalog
extends SparkBaseCatalog {
    private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class);
    private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
    protected Catalog catalog = null;
    private String defaultDatabase;

    public void initialize(String name, CaseInsensitiveStringMap options) {
        this.catalogName = name;
        CatalogContext catalogContext = CatalogContext.create(Options.fromMap((Map<String, String>)options), SparkSession.active().sessionState().newHadoopConf());
        this.catalog = CatalogFactory.createCatalog(catalogContext);
        this.defaultDatabase = (String)options.getOrDefault((Object)SparkCatalogOptions.DEFAULT_DATABASE.key(), (Object)SparkCatalogOptions.DEFAULT_DATABASE.defaultValue());
        if (!this.catalog.databaseExists(this.defaultNamespace()[0])) {
            try {
                this.createNamespace(this.defaultNamespace(), new HashMap<String, String>());
            }
            catch (NamespaceAlreadyExistsException namespaceAlreadyExistsException) {
                // empty catch block
            }
        }
    }

    @Override
    public Catalog paimonCatalog() {
        return this.catalog;
    }

    public String[] defaultNamespace() {
        return new String[]{this.defaultDatabase};
    }

    public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException {
        Preconditions.checkArgument(this.isValidateNamespace(namespace), "Namespace %s is not valid", Arrays.toString(namespace));
        try {
            this.catalog.createDatabase(namespace[0], false, metadata);
        }
        catch (Catalog.DatabaseAlreadyExistException e) {
            throw new NamespaceAlreadyExistsException(namespace);
        }
    }

    public String[][] listNamespaces() {
        List<String> databases = this.catalog.listDatabases();
        String[][] namespaces = new String[databases.size()][];
        for (int i = 0; i < databases.size(); ++i) {
            namespaces[i] = new String[]{databases.get(i)};
        }
        return namespaces;
    }

    public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
        if (namespace.length == 0) {
            return this.listNamespaces();
        }
        if (!this.isValidateNamespace(namespace)) {
            throw new NoSuchNamespaceException(namespace);
        }
        if (this.catalog.databaseExists(namespace[0])) {
            return new String[0][];
        }
        throw new NoSuchNamespaceException(namespace);
    }

    public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException {
        Preconditions.checkArgument(this.isValidateNamespace(namespace), "Namespace %s is not valid", Arrays.toString(namespace));
        String dataBaseName = namespace[0];
        try {
            return this.catalog.loadDatabaseProperties(dataBaseName);
        }
        catch (Catalog.DatabaseNotExistException e) {
            throw new NoSuchNamespaceException(namespace);
        }
    }

    public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
        return this.dropNamespace(namespace, false);
    }

    public boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException {
        Preconditions.checkArgument(this.isValidateNamespace(namespace), "Namespace %s is not valid", Arrays.toString(namespace));
        try {
            this.catalog.dropDatabase(namespace[0], false, cascade);
            return true;
        }
        catch (Catalog.DatabaseNotExistException e) {
            throw new NoSuchNamespaceException(namespace);
        }
        catch (Catalog.DatabaseNotEmptyException e) {
            throw new UnsupportedOperationException(String.format("Namespace %s is not empty", Arrays.toString(namespace)));
        }
    }

    public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
        Preconditions.checkArgument(this.isValidateNamespace(namespace), "Missing database in namespace: %s", Arrays.toString(namespace));
        try {
            return (Identifier[])this.catalog.listTables(namespace[0]).stream().map(table -> Identifier.of((String[])namespace, (String)table)).toArray(Identifier[]::new);
        }
        catch (Catalog.DatabaseNotExistException e) {
            throw new NoSuchNamespaceException(namespace);
        }
    }

    public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
        try {
            return new SparkTable(this.load(ident));
        }
        catch (Catalog.TableNotExistException e) {
            throw new NoSuchTableException(ident);
        }
    }

    public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException {
        org.apache.paimon.table.Table table = this.loadPaimonTable(ident);
        LOG.info("Time travel to version '{}'.", (Object)version);
        return new SparkTable(table.copy(Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), version)));
    }

    public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
        org.apache.paimon.table.Table table = this.loadPaimonTable(ident);
        LOG.info("Time travel target timestamp is {} milliseconds.", (Object)(timestamp /= 1000L));
        Options option = new Options().set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp);
        return new SparkTable(table.copy(option.toMap()));
    }

    private org.apache.paimon.table.Table loadPaimonTable(Identifier ident) throws NoSuchTableException {
        try {
            return this.load(ident);
        }
        catch (Catalog.TableNotExistException e) {
            throw new NoSuchTableException(ident);
        }
    }

    public boolean tableExists(Identifier ident) {
        try {
            return this.catalog.tableExists(this.toIdentifier(ident));
        }
        catch (NoSuchTableException e) {
            return false;
        }
    }

    public Table alterTable(Identifier ident, TableChange ... changes) throws NoSuchTableException {
        List<SchemaChange> schemaChanges = Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
        try {
            this.catalog.alterTable(this.toIdentifier(ident), schemaChanges, false);
            return this.loadTable(ident);
        }
        catch (Catalog.TableNotExistException e) {
            throw new NoSuchTableException(ident);
        }
        catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {
            throw new RuntimeException(e);
        }
    }

    public SparkTable createTable(Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
        try {
            String provider = properties.get("provider");
            Preconditions.checkArgument(this.usePaimon(provider), "SparkCatalog can only create paimon table, but current provider is %s", provider);
            this.catalog.createTable(this.toIdentifier(ident), this.toInitialSchema(schema, partitions, properties), false);
            return this.loadTable(ident);
        }
        catch (Catalog.TableAlreadyExistException e) {
            throw new TableAlreadyExistsException(ident);
        }
        catch (Catalog.DatabaseNotExistException e) {
            throw new NoSuchNamespaceException(ident.namespace());
        }
        catch (NoSuchTableException e) {
            throw new RuntimeException(e);
        }
    }

    public boolean dropTable(Identifier ident) {
        try {
            this.catalog.dropTable(this.toIdentifier(ident), false);
            return true;
        }
        catch (Catalog.TableNotExistException | NoSuchTableException e) {
            return false;
        }
    }

    private SchemaChange toSchemaChange(TableChange change) {
        if (change instanceof TableChange.SetProperty) {
            TableChange.SetProperty set = (TableChange.SetProperty)change;
            this.validateAlterProperty(set.property());
            if (set.property().equals("comment")) {
                return SchemaChange.updateComment(set.value());
            }
            return SchemaChange.setOption(set.property(), set.value());
        }
        if (change instanceof TableChange.RemoveProperty) {
            TableChange.RemoveProperty remove = (TableChange.RemoveProperty)change;
            this.validateAlterProperty(remove.property());
            if (remove.property().equals("comment")) {
                return SchemaChange.updateComment(null);
            }
            return SchemaChange.removeOption(remove.property());
        }
        if (change instanceof TableChange.AddColumn) {
            TableChange.AddColumn add = (TableChange.AddColumn)change;
            this.validateAlterNestedField(add.fieldNames());
            SchemaChange.Move move = SparkCatalog.getMove(add.position(), add.fieldNames());
            return SchemaChange.addColumn(add.fieldNames()[0], SparkTypeUtils.toPaimonType(add.dataType()).copy(add.isNullable()), add.comment(), move);
        }
        if (change instanceof TableChange.RenameColumn) {
            TableChange.RenameColumn rename = (TableChange.RenameColumn)change;
            this.validateAlterNestedField(rename.fieldNames());
            return SchemaChange.renameColumn(rename.fieldNames()[0], rename.newName());
        }
        if (change instanceof TableChange.DeleteColumn) {
            TableChange.DeleteColumn delete = (TableChange.DeleteColumn)change;
            this.validateAlterNestedField(delete.fieldNames());
            return SchemaChange.dropColumn(delete.fieldNames()[0]);
        }
        if (change instanceof TableChange.UpdateColumnType) {
            TableChange.UpdateColumnType update = (TableChange.UpdateColumnType)change;
            this.validateAlterNestedField(update.fieldNames());
            return SchemaChange.updateColumnType(update.fieldNames()[0], SparkTypeUtils.toPaimonType(update.newDataType()));
        }
        if (change instanceof TableChange.UpdateColumnNullability) {
            TableChange.UpdateColumnNullability update = (TableChange.UpdateColumnNullability)change;
            return SchemaChange.updateColumnNullability(update.fieldNames(), update.nullable());
        }
        if (change instanceof TableChange.UpdateColumnComment) {
            TableChange.UpdateColumnComment update = (TableChange.UpdateColumnComment)change;
            return SchemaChange.updateColumnComment(update.fieldNames(), update.newComment());
        }
        if (change instanceof TableChange.UpdateColumnPosition) {
            TableChange.UpdateColumnPosition update = (TableChange.UpdateColumnPosition)change;
            SchemaChange.Move move = SparkCatalog.getMove(update.position(), update.fieldNames());
            return SchemaChange.updateColumnPosition(move);
        }
        throw new UnsupportedOperationException("Change is not supported: " + change.getClass());
    }

    private static SchemaChange.Move getMove(TableChange.ColumnPosition columnPosition, String[] fieldNames) {
        SchemaChange.Move move = null;
        if (columnPosition instanceof TableChange.First) {
            move = SchemaChange.Move.first(fieldNames[0]);
        } else if (columnPosition instanceof TableChange.After) {
            move = SchemaChange.Move.after(fieldNames[0], ((TableChange.After)columnPosition).column());
        }
        return move;
    }

    private Schema toInitialSchema(StructType schema, Transform[] partitions, Map<String, String> properties) {
        Preconditions.checkArgument(Arrays.stream(partitions).allMatch(partition -> {
            NamedReference[] references = partition.references();
            return references.length == 1 && references[0] instanceof FieldReference;
        }));
        HashMap<String, String> normalizedProperties = new HashMap<String, String>(properties);
        normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
        normalizedProperties.remove("comment");
        String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
        List<String> primaryKeys = pkAsString == null ? Collections.emptyList() : Arrays.stream(pkAsString.split(",")).map(String::trim).collect(Collectors.toList());
        Schema.Builder schemaBuilder = Schema.newBuilder().options(normalizedProperties).primaryKey(primaryKeys).partitionKeys(Arrays.stream(partitions).map(partition -> partition.references()[0].describe()).collect(Collectors.toList())).comment(properties.getOrDefault("comment", null));
        for (StructField field : schema.fields()) {
            schemaBuilder.column(field.name(), SparkTypeUtils.toPaimonType(field.dataType()).copy(field.nullable()), (String)field.getComment().getOrElse(() -> null));
        }
        return schemaBuilder.build();
    }

    private void validateAlterNestedField(String[] fieldNames) {
        if (fieldNames.length > 1) {
            throw new UnsupportedOperationException("Alter nested column is not supported: " + Arrays.toString(fieldNames));
        }
    }

    private void validateAlterProperty(String alterKey) {
        if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) {
            throw new UnsupportedOperationException("Alter primary key is not supported");
        }
    }

    private boolean isValidateNamespace(String[] namespace) {
        return namespace.length == 1;
    }

    public void renameTable(Identifier oldIdent, Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException {
        try {
            this.catalog.renameTable(this.toIdentifier(oldIdent), this.toIdentifier(newIdent), false);
        }
        catch (Catalog.TableNotExistException e) {
            throw new NoSuchTableException(oldIdent);
        }
        catch (Catalog.TableAlreadyExistException e) {
            throw new TableAlreadyExistsException(newIdent);
        }
    }

    protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident) throws NoSuchTableException {
        if (!this.isValidateNamespace(ident.namespace())) {
            throw new NoSuchTableException(ident);
        }
        return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name());
    }

    protected org.apache.paimon.table.Table load(Identifier ident) throws Catalog.TableNotExistException, NoSuchTableException {
        return this.catalog.getTable(this.toIdentifier(ident));
    }

    public void alterNamespace(String[] namespace, NamespaceChange ... changes) {
        throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
    }
}

