/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.DataCatalogTable;
import org.apache.paimon.flink.FlinkCatalogOptions;
import org.apache.paimon.flink.FlinkTableFactory;
import org.apache.paimon.flink.FormatCatalogTable;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.SystemCatalogTable;
import org.apache.paimon.flink.log.LogStoreRegister;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkCatalog
extends AbstractCatalog {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalog.class);
    public static final String NUM_ROWS_KEY = "numRows";
    public static final String LAST_UPDATE_TIME_KEY = "lastUpdateTime";
    public static final String TOTAL_SIZE_KEY = "totalSize";
    public static final String NUM_FILES_KEY = "numFiles";
    private final ClassLoader classLoader;
    private final Catalog catalog;
    private final String name;
    private final boolean logStoreAutoRegister;
    private final Duration logStoreAutoRegisterTimeout;
    private final boolean disableCreateTableInDefaultDatabase;

    public FlinkCatalog(Catalog catalog, String name, String defaultDatabase, ClassLoader classLoader, Options options) {
        super(name, defaultDatabase);
        this.catalog = catalog;
        this.name = name;
        this.classLoader = classLoader;
        this.logStoreAutoRegister = options.get(FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER);
        this.logStoreAutoRegisterTimeout = options.get(FlinkCatalogOptions.REGISTER_TIMEOUT);
        this.disableCreateTableInDefaultDatabase = options.get(FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
        if (!this.disableCreateTableInDefaultDatabase && !catalog.databaseExists(defaultDatabase)) {
            try {
                catalog.createDatabase(defaultDatabase, true);
            }
            catch (Catalog.DatabaseAlreadyExistException databaseAlreadyExistException) {
                // empty catch block
            }
        }
    }

    public Catalog catalog() {
        return this.catalog;
    }

    public Optional<Factory> getFactory() {
        return Optional.of(new FlinkTableFactory());
    }

    public List<String> listDatabases() throws CatalogException {
        return this.catalog.listDatabases();
    }

    public boolean databaseExists(String databaseName) throws CatalogException {
        return this.catalog.databaseExists(databaseName);
    }

    public CatalogDatabase getDatabase(String databaseName) throws CatalogException, DatabaseNotExistException {
        if (this.databaseExists(databaseName)) {
            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
        }
        throw new DatabaseNotExistException(this.getName(), databaseName);
    }

    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
        if (database != null && database.getDescription().isPresent() && !((String)database.getDescription().get()).equals("")) {
            throw new UnsupportedOperationException("Create database with description is unsupported.");
        }
        try {
            this.catalog.createDatabase(name, ignoreIfExists, database == null ? Collections.emptyMap() : database.getProperties());
        }
        catch (Catalog.DatabaseAlreadyExistException e) {
            throw new DatabaseAlreadyExistException(this.getName(), e.database());
        }
    }

    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotEmptyException, DatabaseNotExistException, CatalogException {
        try {
            this.catalog.dropDatabase(name, ignoreIfNotExists, cascade);
        }
        catch (Catalog.DatabaseNotEmptyException e) {
            throw new DatabaseNotEmptyException(this.getName(), e.database());
        }
        catch (Catalog.DatabaseNotExistException e) {
            throw new DatabaseNotExistException(this.getName(), e.database());
        }
    }

    public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
        try {
            return this.catalog.listTables(databaseName);
        }
        catch (Catalog.DatabaseNotExistException e) {
            throw new DatabaseNotExistException(this.getName(), e.database());
        }
    }

    public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
        return this.getTable(tablePath, null);
    }

    public CatalogTable getTable(ObjectPath tablePath, long timestamp) throws TableNotExistException, CatalogException {
        return this.getTable(tablePath, (Long)timestamp);
    }

    private CatalogTable getTable(ObjectPath tablePath, @Nullable Long timestamp) throws TableNotExistException {
        Table table;
        try {
            table = this.catalog.getTable(FlinkCatalog.toIdentifier(tablePath));
        }
        catch (Catalog.TableNotExistException e) {
            throw new TableNotExistException(this.getName(), tablePath);
        }
        if (table instanceof FormatTable) {
            if (timestamp != null) {
                throw new UnsupportedOperationException(String.format("Format table %s cannot support as of timestamp.", tablePath));
            }
            return new FormatCatalogTable((FormatTable)table);
        }
        if (timestamp != null) {
            Options options = new Options();
            options.set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp);
            table = table.copy(options.toMap());
        }
        if (table instanceof FileStoreTable) {
            return this.toCatalogTable(table);
        }
        return new SystemCatalogTable(table);
    }

    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
        return this.catalog.tableExists(FlinkCatalog.toIdentifier(tablePath));
    }

    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
        Identifier identifier = FlinkCatalog.toIdentifier(tablePath);
        Table table = null;
        try {
            if (this.logStoreAutoRegister && this.catalog.tableExists(identifier)) {
                table = this.catalog.getTable(identifier);
            }
            this.catalog.dropTable(FlinkCatalog.toIdentifier(tablePath), ignoreIfNotExists);
            if (this.logStoreAutoRegister && table != null) {
                LogStoreRegister.unRegisterLogSystem(identifier, table.options(), this.classLoader);
            }
        }
        catch (Catalog.TableNotExistException e) {
            throw new TableNotExistException(this.getName(), tablePath);
        }
    }

    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        if (!(table instanceof CatalogTable)) {
            throw new UnsupportedOperationException("Only support CatalogTable, but is: " + table.getClass());
        }
        if (Objects.equals(this.getDefaultDatabase(), tablePath.getDatabaseName()) && this.disableCreateTableInDefaultDatabase) {
            throw new UnsupportedOperationException("Creating table in default database is disabled, please specify a database name.");
        }
        Identifier identifier = FlinkCatalog.toIdentifier(tablePath);
        HashMap<String, String> options = new HashMap<String, String>(table.getOptions());
        Schema paimonSchema = this.buildPaimonSchema(identifier, (CatalogTable)table, options);
        boolean unRegisterLogSystem = false;
        try {
            this.catalog.createTable(identifier, paimonSchema, ignoreIfExists);
        }
        catch (Catalog.TableAlreadyExistException e) {
            unRegisterLogSystem = true;
            throw new TableAlreadyExistException(this.getName(), tablePath);
        }
        catch (Catalog.DatabaseNotExistException e) {
            unRegisterLogSystem = true;
            throw new DatabaseNotExistException(this.getName(), e.database());
        }
        finally {
            if (this.logStoreAutoRegister && unRegisterLogSystem) {
                LogStoreRegister.unRegisterLogSystem(identifier, options, this.classLoader);
            }
        }
    }

    protected Schema buildPaimonSchema(Identifier identifier, CatalogTable catalogTable, Map<String, String> options) {
        Path expectedPath;
        String path;
        String connector = options.get(FactoryUtil.CONNECTOR.key());
        options.remove(FactoryUtil.CONNECTOR.key());
        if (!StringUtils.isNullOrWhitespaceOnly(connector) && !"paimon".equals(connector)) {
            throw new CatalogException("Paimon Catalog only supports paimon tables, but you specify  'connector'= '" + connector + "' when using Paimon Catalog\n You can create TEMPORARY table instead if you want to create the table of other connector.");
        }
        if (this.logStoreAutoRegister) {
            Catalog.tableDefaultOptions(this.catalog.options()).forEach(options::putIfAbsent);
            options.put(FlinkCatalogOptions.REGISTER_TIMEOUT.key(), this.logStoreAutoRegisterTimeout.toString());
            LogStoreRegister.registerLogSystem(this.catalog, identifier, options, this.classLoader);
        }
        if ((path = options.remove(CoreOptions.PATH.key())) != null && !new Path(path).equals(expectedPath = this.catalog.getTableLocation(identifier))) {
            throw new CatalogException(String.format("You specified the Path when creating the table, but the Path '%s' is different from where it should be '%s'. Please remove the Path.", path, expectedPath));
        }
        return FlinkCatalog.fromCatalogTable(catalogTable.copy(options));
    }

    private List<SchemaChange> toSchemaChange(TableChange change, Map<String, Integer> oldTableNonPhysicalColumnIndex) {
        ArrayList<SchemaChange> schemaChanges = new ArrayList<SchemaChange>();
        if (change instanceof TableChange.AddColumn) {
            if (((TableChange.AddColumn)change).getColumn().isPhysical()) {
                TableChange.AddColumn add = (TableChange.AddColumn)change;
                String comment = add.getColumn().getComment().orElse(null);
                SchemaChange.Move move = this.getMove(add.getPosition(), add.getColumn().getName());
                schemaChanges.add(SchemaChange.addColumn(add.getColumn().getName(), LogicalTypeConversion.toDataType(add.getColumn().getDataType().getLogicalType()), comment, move));
            }
            return schemaChanges;
        }
        if (change instanceof TableChange.AddWatermark) {
            TableChange.AddWatermark add = (TableChange.AddWatermark)change;
            this.setWatermarkOptions(add.getWatermark(), schemaChanges);
            return schemaChanges;
        }
        if (change instanceof TableChange.DropColumn) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(((TableChange.DropColumn)change).getColumnName())) {
                TableChange.DropColumn drop = (TableChange.DropColumn)change;
                schemaChanges.add(SchemaChange.dropColumn(drop.getColumnName()));
            }
            return schemaChanges;
        }
        if (change instanceof TableChange.DropWatermark) {
            String watermarkPrefix = FlinkCatalogPropertiesUtil.compoundKey("schema", "watermark", 0);
            schemaChanges.add(SchemaChange.removeOption(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "rowtime")));
            schemaChanges.add(SchemaChange.removeOption(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.expr")));
            schemaChanges.add(SchemaChange.removeOption(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.data-type")));
            return schemaChanges;
        }
        if (change instanceof TableChange.ModifyColumnName) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(((TableChange.ModifyColumnName)change).getOldColumnName())) {
                TableChange.ModifyColumnName modify = (TableChange.ModifyColumnName)change;
                schemaChanges.add(SchemaChange.renameColumn(modify.getOldColumnName(), modify.getNewColumnName()));
            }
            return schemaChanges;
        }
        if (change instanceof TableChange.ModifyPhysicalColumnType) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(((TableChange.ModifyPhysicalColumnType)change).getOldColumn().getName())) {
                TableChange.ModifyPhysicalColumnType modify = (TableChange.ModifyPhysicalColumnType)change;
                LogicalType newColumnType = modify.getNewType().getLogicalType();
                LogicalType oldColumnType = modify.getOldColumn().getDataType().getLogicalType();
                if (newColumnType.isNullable() != oldColumnType.isNullable()) {
                    schemaChanges.add(SchemaChange.updateColumnNullability(modify.getNewColumn().getName(), newColumnType.isNullable()));
                }
                schemaChanges.add(SchemaChange.updateColumnType(modify.getOldColumn().getName(), LogicalTypeConversion.toDataType(newColumnType)));
            }
            return schemaChanges;
        }
        if (change instanceof TableChange.ModifyColumnPosition) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(((TableChange.ModifyColumnPosition)change).getOldColumn().getName())) {
                TableChange.ModifyColumnPosition modify = (TableChange.ModifyColumnPosition)change;
                SchemaChange.Move move = this.getMove(modify.getNewPosition(), modify.getNewColumn().getName());
                schemaChanges.add(SchemaChange.updateColumnPosition(move));
            }
            return schemaChanges;
        }
        if (change instanceof TableChange.ModifyColumnComment) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(((TableChange.ModifyColumnComment)change).getOldColumn().getName())) {
                TableChange.ModifyColumnComment modify = (TableChange.ModifyColumnComment)change;
                schemaChanges.add(SchemaChange.updateColumnComment(modify.getNewColumn().getName(), modify.getNewComment()));
            }
            return schemaChanges;
        }
        if (change instanceof TableChange.ModifyWatermark) {
            TableChange.ModifyWatermark modify = (TableChange.ModifyWatermark)change;
            this.setWatermarkOptions(modify.getNewWatermark(), schemaChanges);
            return schemaChanges;
        }
        if (change instanceof TableChange.SetOption) {
            TableChange.SetOption setOption = (TableChange.SetOption)change;
            String key = setOption.getKey();
            String value = setOption.getValue();
            SchemaManager.checkAlterTablePath(key);
            if ("comment".equals(key)) {
                schemaChanges.add(SchemaChange.updateComment(value));
            } else {
                schemaChanges.add(SchemaChange.setOption(key, value));
            }
            return schemaChanges;
        }
        if (change instanceof TableChange.ResetOption) {
            TableChange.ResetOption resetOption = (TableChange.ResetOption)change;
            String key = resetOption.getKey();
            if ("comment".equals(key)) {
                schemaChanges.add(SchemaChange.updateComment(null));
            } else {
                schemaChanges.add(SchemaChange.removeOption(resetOption.getKey()));
            }
            return schemaChanges;
        }
        if (change instanceof TableChange.ModifyColumn) {
            if (oldTableNonPhysicalColumnIndex.containsKey(((TableChange.ModifyColumn)change).getOldColumn().getName()) && !(((TableChange.ModifyColumn)change).getNewColumn() instanceof Column.PhysicalColumn)) {
                return schemaChanges;
            }
            throw new UnsupportedOperationException("Change is not supported: " + change.getClass());
        }
        throw new UnsupportedOperationException("Change is not supported: " + change.getClass());
    }

    public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
        if (ignoreIfNotExists && !this.tableExists(tablePath)) {
            return;
        }
        CatalogTable table = this.getTable(tablePath);
        FlinkCatalog.validateAlterTable(table, (CatalogTable)newTable);
        ArrayList<SchemaChange> changes = new ArrayList<SchemaChange>();
        Map oldProperties = table.getOptions();
        for (Map.Entry entry : newTable.getOptions().entrySet()) {
            String key = (String)entry.getKey();
            String value = (String)entry.getValue();
            if (Objects.equals(value, oldProperties.get(key))) continue;
            if (CoreOptions.PATH.key().equalsIgnoreCase(key)) {
                throw new IllegalArgumentException("Illegal table path in table options: " + value);
            }
            changes.add(SchemaChange.setOption(key, value));
        }
        oldProperties.keySet().forEach(k -> {
            if (!newTable.getOptions().containsKey(k)) {
                changes.add(SchemaChange.removeOption(k));
            }
        });
        try {
            this.catalog.alterTable(FlinkCatalog.toIdentifier(tablePath), changes, ignoreIfNotExists);
        }
        catch (Catalog.TableNotExistException e) {
            throw new TableNotExistException(this.getName(), tablePath);
        }
        catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {
            throw new CatalogException((Throwable)e);
        }
    }

    public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, List<TableChange> tableChanges, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
        Table table;
        if (ignoreIfNotExists && !this.tableExists(tablePath)) {
            return;
        }
        try {
            table = this.catalog.getTable(FlinkCatalog.toIdentifier(tablePath));
        }
        catch (Catalog.TableNotExistException e) {
            throw new TableNotExistException(this.getName(), tablePath);
        }
        Preconditions.checkArgument(table instanceof FileStoreTable, "Can't alter system table.");
        FlinkCatalog.validateAlterTable((CatalogTable)this.toCatalogTable(table), (CatalogTable)newTable);
        Map<String, Integer> oldTableNonPhysicalColumnIndex = FlinkCatalogPropertiesUtil.nonPhysicalColumns(table.options(), table.rowType().getFieldNames());
        ArrayList<SchemaChange> changes = new ArrayList<SchemaChange>();
        Map<String, String> schemaOptions = FlinkCatalogPropertiesUtil.serializeNonPhysicalNewColumns(((ResolvedCatalogBaseTable)newTable).getResolvedSchema());
        table.options().forEach((k, v) -> {
            if (FlinkCatalogPropertiesUtil.isNonPhysicalColumnKey(k) && !schemaOptions.containsKey(k)) {
                changes.add(SchemaChange.removeOption(k));
            }
        });
        schemaOptions.forEach((k, v) -> {
            if (!table.options().containsKey(k) || !table.options().get(k).equals(v)) {
                changes.add(SchemaChange.setOption(k, v));
            }
        });
        if (null != tableChanges) {
            List schemaChanges = tableChanges.stream().flatMap(tableChange -> this.toSchemaChange((TableChange)tableChange, oldTableNonPhysicalColumnIndex).stream()).collect(Collectors.toList());
            changes.addAll(schemaChanges);
        }
        try {
            this.catalog.alterTable(FlinkCatalog.toIdentifier(tablePath), changes, ignoreIfNotExists);
        }
        catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException | Catalog.TableNotExistException e) {
            throw new CatalogException((Throwable)e);
        }
    }

    private SchemaChange.Move getMove(TableChange.ColumnPosition columnPosition, String fieldName) {
        SchemaChange.Move move = null;
        if (columnPosition instanceof TableChange.First) {
            move = SchemaChange.Move.first(fieldName);
        } else if (columnPosition instanceof TableChange.After) {
            move = SchemaChange.Move.after(fieldName, ((TableChange.After)columnPosition).column());
        }
        return move;
    }

    private String getWatermarkKeyPrefix() {
        return FlinkCatalogPropertiesUtil.compoundKey("schema", "watermark", 0);
    }

    private String getWatermarkRowTimeKey(String watermarkPrefix) {
        return FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "rowtime");
    }

    private String getWatermarkExprKey(String watermarkPrefix) {
        return FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.expr");
    }

    private String getWatermarkExprDataTypeKey(String watermarkPrefix) {
        return FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.data-type");
    }

    private void setWatermarkOptions(WatermarkSpec wms, List<SchemaChange> schemaChanges) {
        String watermarkPrefix = this.getWatermarkKeyPrefix();
        schemaChanges.add(SchemaChange.setOption(this.getWatermarkRowTimeKey(watermarkPrefix), wms.getRowtimeAttribute()));
        schemaChanges.add(SchemaChange.setOption(this.getWatermarkExprKey(watermarkPrefix), wms.getWatermarkExpression().asSerializableString()));
        schemaChanges.add(SchemaChange.setOption(this.getWatermarkExprDataTypeKey(watermarkPrefix), wms.getWatermarkExpression().getOutputDataType().getLogicalType().asSerializableString()));
    }

    private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2) {
        if (ct1 instanceof SystemCatalogTable) {
            throw new UnsupportedOperationException("Can't alter system table.");
        }
        TableSchema ts1 = ct1.getSchema();
        TableSchema ts2 = ct2.getSchema();
        boolean pkEquality = false;
        if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
            pkEquality = Objects.equals(((UniqueConstraint)ts1.getPrimaryKey().get()).getType(), ((UniqueConstraint)ts2.getPrimaryKey().get()).getType()) && Objects.equals(((UniqueConstraint)ts1.getPrimaryKey().get()).getColumns(), ((UniqueConstraint)ts2.getPrimaryKey().get()).getColumns());
        } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
            pkEquality = true;
        }
        if (!pkEquality) {
            throw new UnsupportedOperationException("Altering primary key is not supported yet.");
        }
        if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
            throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
        }
    }

    public final void open() throws CatalogException {
    }

    public final void close() throws CatalogException {
        try {
            this.catalog.close();
        }
        catch (Exception e) {
            throw new CatalogException("Failed to close catalog " + this.catalog.toString(), (Throwable)e);
        }
    }

    private CatalogTableImpl toCatalogTable(Table table) {
        HashMap<String, String> newOptions = new HashMap<String, String>(table.options());
        TableSchema.Builder builder = TableSchema.builder();
        HashMap<String, String> nonPhysicalColumnComments = new HashMap<String, String>();
        List physicalRowFields = LogicalTypeConversion.toLogicalType(table.rowType()).getFields();
        List<String> physicalColumns = table.rowType().getFieldNames();
        int columnCount = physicalRowFields.size() + FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount(newOptions, physicalColumns);
        int physicalColumnIndex = 0;
        for (int i = 0; i < columnCount; ++i) {
            String optionalName = (String)newOptions.get(FlinkCatalogPropertiesUtil.compoundKey("schema", i, "name"));
            if (optionalName == null || physicalColumns.contains(optionalName)) {
                RowType.RowField field = (RowType.RowField)physicalRowFields.get(physicalColumnIndex++);
                builder.field(field.getName(), TypeConversions.fromLogicalToDataType((LogicalType)field.getType()));
                continue;
            }
            builder.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(newOptions, i));
            if (!newOptions.containsKey(FlinkCatalogPropertiesUtil.compoundKey("schema", i, "comment"))) continue;
            nonPhysicalColumnComments.put(optionalName, (String)newOptions.get(FlinkCatalogPropertiesUtil.compoundKey("schema", i, "comment")));
            newOptions.remove(FlinkCatalogPropertiesUtil.compoundKey("schema", i, "comment"));
        }
        if (newOptions.keySet().stream().anyMatch(key -> key.startsWith(FlinkCatalogPropertiesUtil.compoundKey("schema", "watermark")))) {
            builder.watermark(FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(newOptions));
        }
        if (table.primaryKeys().size() > 0) {
            builder.primaryKey(table.primaryKeys().stream().collect(Collectors.joining("_", "PK_", "")), table.primaryKeys().toArray(new String[0]));
        }
        TableSchema schema = builder.build();
        DescriptorProperties removeProperties = new DescriptorProperties(false);
        removeProperties.putTableSchema("schema", schema);
        removeProperties.asMap().keySet().forEach(newOptions::remove);
        return new DataCatalogTable(table, schema, table.partitionKeys(), newOptions, table.comment().orElse(""), nonPhysicalColumnComments);
    }

    public static Schema fromCatalogTable(CatalogTable table) {
        ResolvedCatalogTable catalogTable = (ResolvedCatalogTable)table;
        ResolvedSchema schema = catalogTable.getResolvedSchema();
        org.apache.flink.table.types.logical.RowType rowType = (org.apache.flink.table.types.logical.RowType)schema.toPhysicalRowDataType().getLogicalType();
        HashMap<String, String> options = new HashMap<String, String>(catalogTable.getOptions());
        options.putAll(FlinkCatalog.columnOptions(schema));
        Schema.Builder schemaBuilder = Schema.newBuilder().comment(catalogTable.getComment()).options(options).primaryKey(schema.getPrimaryKey().map(pk -> pk.getColumns()).orElse(Collections.emptyList())).partitionKeys(catalogTable.getPartitionKeys());
        Map<String, String> columnComments = FlinkCatalog.getColumnComments((CatalogTable)catalogTable);
        rowType.getFields().forEach(field -> schemaBuilder.column(field.getName(), LogicalTypeConversion.toDataType(field.getType()), (String)columnComments.get(field.getName())));
        return schemaBuilder.build();
    }

    private static Map<String, String> getColumnComments(CatalogTable catalogTable) {
        return catalogTable.getUnresolvedSchema().getColumns().stream().filter(c -> c.getComment().isPresent()).collect(Collectors.toMap(Schema.UnresolvedColumn::getName, c -> (String)c.getComment().get()));
    }

    private static Map<String, String> columnOptions(ResolvedSchema schema) {
        HashMap<String, String> columnOptions = new HashMap<String, String>(FlinkCatalogPropertiesUtil.serializeNonPhysicalNewColumns(schema));
        List watermarkSpecs = schema.getWatermarkSpecs();
        if (!watermarkSpecs.isEmpty()) {
            Preconditions.checkArgument(watermarkSpecs.size() == 1);
            columnOptions.putAll(FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec((WatermarkSpec)watermarkSpecs.get(0)));
        }
        return columnOptions;
    }

    public static Identifier toIdentifier(ObjectPath path) {
        return new Identifier(path.getDatabaseName(), path.getObjectName());
    }

    public final void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws CatalogException {
        throw new UnsupportedOperationException();
    }

    public final void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws CatalogException, TableNotExistException, TableAlreadyExistException {
        ObjectPath toTable = new ObjectPath(tablePath.getDatabaseName(), newTableName);
        try {
            this.catalog.renameTable(FlinkCatalog.toIdentifier(tablePath), FlinkCatalog.toIdentifier(toTable), ignoreIfNotExists);
        }
        catch (Catalog.TableNotExistException e) {
            throw new TableNotExistException(this.getName(), tablePath);
        }
        catch (Catalog.TableAlreadyExistException e) {
            throw new TableAlreadyExistException(this.getName(), toTable);
        }
    }

    public final List<String> listViews(String databaseName) throws CatalogException {
        return Collections.emptyList();
    }

    public final List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
        if (this.isCalledFromFlinkRecomputeStatisticsProgram()) {
            LOG.info("Skipping listPartitions method due to detection of FlinkRecomputeStatisticsProgram call.");
            return Collections.emptyList();
        }
        return this.getPartitionSpecs(tablePath, null);
    }

    private Table getPaimonTable(ObjectPath tablePath) throws TableNotExistException {
        try {
            Identifier identifier = FlinkCatalog.toIdentifier(tablePath);
            return this.catalog.getTable(identifier);
        }
        catch (Catalog.TableNotExistException e) {
            throw new TableNotExistException(this.getName(), tablePath);
        }
    }

    private List<PartitionEntry> getPartitionEntries(Table table, ObjectPath tablePath, @Nullable CatalogPartitionSpec partitionSpec) throws TableNotPartitionedException {
        if (table.partitionKeys() == null || table.partitionKeys().size() == 0) {
            throw new TableNotPartitionedException(this.getName(), tablePath);
        }
        ReadBuilder readBuilder = table.newReadBuilder();
        if (partitionSpec != null && partitionSpec.getPartitionSpec() != null) {
            readBuilder.withPartitionFilter(partitionSpec.getPartitionSpec());
        }
        return readBuilder.newScan().listPartitionEntries();
    }

    private List<CatalogPartitionSpec> getPartitionSpecs(ObjectPath tablePath, @Nullable CatalogPartitionSpec partitionSpec) throws CatalogException, TableNotPartitionedException, TableNotExistException {
        FileStoreTable table = (FileStoreTable)this.getPaimonTable(tablePath);
        List<PartitionEntry> partitionEntries = this.getPartitionEntries(table, tablePath, partitionSpec);
        RowType partitionRowType = table.schema().logicalPartitionType();
        InternalRowPartitionComputer partitionComputer = FileStorePathFactory.getPartitionComputer(partitionRowType, new CoreOptions(table.options()).partitionDefaultName());
        return partitionEntries.stream().map(e -> {
            LinkedHashMap<String, String> partValues = partitionComputer.generatePartValues(Preconditions.checkNotNull(e.partition(), "Partition row data is null. This is unexpected."));
            return new CatalogPartitionSpec(partValues);
        }).collect(Collectors.toList());
    }

    public final List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, CatalogException {
        return this.getPartitionSpecs(tablePath, partitionSpec);
    }

    public final List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws CatalogException {
        return Collections.emptyList();
    }

    public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
        Preconditions.checkNotNull(partitionSpec, "partition spec shouldn't be null");
        try {
            List<PartitionEntry> partitionEntries = this.getPartitionEntries(this.getPaimonTable(tablePath), tablePath, partitionSpec);
            if (partitionEntries.isEmpty()) {
                throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec);
            }
            PartitionEntry partitionEntry = partitionEntries.get(0);
            HashMap<String, String> properties = new HashMap<String, String>();
            properties.put(NUM_ROWS_KEY, String.valueOf(partitionEntry.recordCount()));
            properties.put(LAST_UPDATE_TIME_KEY, String.valueOf(partitionEntry.lastFileCreationTime()));
            properties.put(NUM_FILES_KEY, String.valueOf(partitionEntry.fileCount()));
            properties.put(TOTAL_SIZE_KEY, String.valueOf(partitionEntry.fileSizeInBytes()));
            return new CatalogPartitionImpl(properties, "");
        }
        catch (TableNotExistException | TableNotPartitionedException e) {
            throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec);
        }
    }

    public final boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
        try {
            List<CatalogPartitionSpec> partitionSpecs = this.getPartitionSpecs(tablePath, partitionSpec);
            return partitionSpecs.size() > 0;
        }
        catch (TableNotExistException | TableNotPartitionedException e) {
            throw new CatalogException(e);
        }
    }

    public final void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws CatalogException {
        throw new UnsupportedOperationException();
    }

    public final void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
        if (!this.partitionExists(tablePath, partitionSpec) && !ignoreIfNotExists) {
            throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec);
        }
        try {
            Identifier identifier = FlinkCatalog.toIdentifier(tablePath);
            this.catalog.dropPartition(identifier, partitionSpec.getPartitionSpec());
        }
        catch (Catalog.TableNotExistException e) {
            throw new CatalogException((Throwable)e);
        }
        catch (Catalog.PartitionNotExistException e) {
            throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec);
        }
    }

    public final void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws CatalogException {
        throw new UnsupportedOperationException();
    }

    public final List<String> listFunctions(String dbName) throws CatalogException {
        return Collections.emptyList();
    }

    public final CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
        throw new FunctionNotExistException(this.getName(), functionPath);
    }

    public final boolean functionExists(ObjectPath functionPath) throws CatalogException {
        return false;
    }

    public final void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws CatalogException {
        throw new UnsupportedOperationException("Create function is not supported, maybe you can use 'CREATE TEMPORARY FUNCTION' instead.");
    }

    public final void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws CatalogException {
        throw new UnsupportedOperationException();
    }

    public final void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws CatalogException {
        throw new UnsupportedOperationException();
    }

    public final CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    public final CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    public final CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    public final CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    public final void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws CatalogException {
        throw new UnsupportedOperationException();
    }

    public final void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws CatalogException {
        throw new UnsupportedOperationException();
    }

    public final void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws CatalogException {
        throw new UnsupportedOperationException();
    }

    public final void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws CatalogException {
        throw new UnsupportedOperationException();
    }

    public List<String> listProcedures(String dbName) throws DatabaseNotExistException, CatalogException {
        if (!this.databaseExists(dbName)) {
            throw new DatabaseNotExistException(this.name, dbName);
        }
        return ProcedureUtil.listProcedures();
    }

    public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException {
        return ProcedureUtil.getProcedure(this.catalog, procedurePath).orElseThrow(() -> new ProcedureNotExistException(this.name, procedurePath));
    }

    private boolean isCalledFromFlinkRecomputeStatisticsProgram() {
        StackTraceElement[] stackTrace;
        for (StackTraceElement stackTraceElement : stackTrace = Thread.currentThread().getStackTrace()) {
            if (!stackTraceElement.getClassName().contains("FlinkRecomputeStatisticsProgram")) continue;
            return true;
        }
        return false;
    }
}

