/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;

import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.SchemaChangeEvent;
import java.sql.SQLException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlDdlBuilder;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSchema
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(MySqlSchema.class);
    private static final String SHOW_CREATE_TABLE = "SHOW CREATE TABLE ";
    private static final String DESC_TABLE = "DESC ";
    private final MySqlConnectorConfig connectorConfig;
    private final MySqlDatabaseSchema databaseSchema;
    private final Map<TableId, TableChanges.TableChange> schemasByTableId;
    private final Map<TableId, CatalogTable> tableMap;

    public MySqlSchema(MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive, Map<TableId, CatalogTable> tableMap) {
        this.connectorConfig = sourceConfig.getDbzConnectorConfig();
        this.databaseSchema = MySqlConnectionUtils.createMySqlDatabaseSchema(this.connectorConfig, isTableIdCaseSensitive);
        this.schemasByTableId = new HashMap<TableId, TableChanges.TableChange>();
        this.tableMap = tableMap;
    }

    public TableChanges.TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
        TableChanges.TableChange schema = this.schemasByTableId.get(tableId);
        if (schema == null) {
            schema = this.readTableSchema(jdbc, tableId);
            this.schemasByTableId.put(tableId, schema);
        }
        return schema;
    }

    private TableChanges.TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
        Map<Object, Object> tableChangeMap = new HashMap();
        try {
            tableChangeMap = this.getTableSchemaByShowCreateTable(jdbc, tableId);
            if (tableChangeMap.isEmpty()) {
                log.debug("Load schema is empty for table {}", (Object)tableId);
            }
        }
        catch (Exception e) {
            log.debug("Ignore exception when execute `SHOW CREATE TABLE {}` failed", (Object)tableId, (Object)e);
        }
        if (tableChangeMap.isEmpty()) {
            try {
                log.info("Fallback to use `DESC {}` load schema", (Object)tableId);
                tableChangeMap = this.getTableSchemaByDescTable(jdbc, tableId);
            }
            catch (SQLException ex) {
                throw new SeaTunnelException(String.format("Failed to read schema for table %s", tableId), (Throwable)ex);
            }
        }
        if (!tableChangeMap.containsKey(tableId)) {
            throw new RuntimeException(String.format("Can't obtain schema for table %s", tableId));
        }
        return (TableChanges.TableChange)tableChangeMap.get(tableId);
    }

    public TableChanges.TableChange readTableSchemaByDesc(JdbcConnection jdbc, TableId tableId) {
        try {
            return this.getTableSchemaByDescTable(jdbc, tableId).get(tableId);
        }
        catch (SQLException ex) {
            throw new SeaTunnelException(String.format("Failed to read schema for table %s", tableId), (Throwable)ex);
        }
    }

    private Map<TableId, TableChanges.TableChange> getTableSchemaByShowCreateTable(JdbcConnection jdbc, TableId tableId) throws SQLException {
        AtomicReference ddl = new AtomicReference();
        String sql = SHOW_CREATE_TABLE + MySqlUtils.quote(tableId);
        jdbc.query(sql, rs -> {
            rs.next();
            ddl.set(rs.getString(2));
        });
        return this.parseSnapshotDdl(tableId, (String)ddl.get());
    }

    private Map<TableId, TableChanges.TableChange> getTableSchemaByDescTable(JdbcConnection jdbc, TableId tableId) throws SQLException {
        MySqlDdlBuilder ddlBuilder = new MySqlDdlBuilder(tableId);
        String sql = DESC_TABLE + MySqlUtils.quote(tableId);
        jdbc.query(sql, rs -> {
            while (rs.next()) {
                ddlBuilder.addColumn(MySqlDdlBuilder.Column.builder().columnName(rs.getString("Field")).columnType(rs.getString("Type")).nullable(rs.getString("Null").equalsIgnoreCase("YES")).primaryKey("PRI".equals(rs.getString("Key"))).uniqueKey("UNI".equals(rs.getString("Key"))).defaultValue(rs.getString("Default")).extra(rs.getString("Extra")).build());
            }
        });
        return this.parseSnapshotDdl(tableId, ddlBuilder.generateDdl());
    }

    private Map<TableId, TableChanges.TableChange> parseSnapshotDdl(TableId tableId, String ddl) {
        HashMap<TableId, TableChanges.TableChange> tableChangeMap = new HashMap<TableId, TableChanges.TableChange>();
        MySqlOffsetContext offsetContext = MySqlOffsetContext.initial(this.connectorConfig);
        MySqlPartition partition = new MySqlPartition(this.connectorConfig.getLogicalName());
        List<SchemaChangeEvent> schemaChangeEvents = this.databaseSchema.parseSnapshotDdl(partition, ddl, tableId.catalog(), offsetContext, Instant.now());
        for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
            for (TableChanges.TableChange tableChange : schemaChangeEvent.getTableChanges()) {
                Table table = CatalogTableUtils.mergeCatalogTableConfig(tableChange.getTable(), this.tableMap.get(tableId));
                TableChanges.TableChange newTableChange = new TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
                tableChangeMap.put(tableId, newTableChange);
            }
        }
        return tableChangeMap;
    }

    @Override
    public void close() throws Exception {
        this.databaseSchema.close();
    }
}

