/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;

import io.debezium.relational.Tables;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser.CustomMySqlAntlrDdlParser;

public class MySqlSchemaChangeResolver
extends AbstractSchemaChangeResolver {
    private transient Tables tables;
    private transient CustomMySqlAntlrDdlParser customMySqlAntlrDdlParser;

    public MySqlSchemaChangeResolver(SourceConfig.Factory<JdbcSourceConfig> sourceConfigFactory) {
        super(sourceConfigFactory.create(0));
    }

    @Override
    public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) {
        TablePath tablePath = SourceRecordUtils.getTablePath(record);
        String ddl = SourceRecordUtils.getDdl(record);
        if (Objects.isNull(this.customMySqlAntlrDdlParser)) {
            this.customMySqlAntlrDdlParser = new CustomMySqlAntlrDdlParser(tablePath, this.jdbcSourceConfig.getDbzConnectorConfig());
        }
        if (Objects.isNull(this.tables)) {
            this.tables = new Tables();
        }
        this.customMySqlAntlrDdlParser.setCurrentDatabase(tablePath.getDatabaseName());
        this.customMySqlAntlrDdlParser.setCurrentSchema(tablePath.getSchemaName());
        this.customMySqlAntlrDdlParser.parse(ddl, this.tables);
        List<AlterTableColumnEvent> parsedEvents = this.customMySqlAntlrDdlParser.getAndClearParsedEvents();
        parsedEvents.forEach(e -> e.setSourceDialectName("MySQL"));
        AlterTableColumnsEvent alterTableColumnsEvent = new AlterTableColumnsEvent(TableIdentifier.of((String)"", (String)tablePath.getDatabaseName(), (String)tablePath.getSchemaName(), (String)tablePath.getTableName()), parsedEvents);
        alterTableColumnsEvent.setStatement(ddl);
        alterTableColumnsEvent.setSourceDialectName("MySQL");
        return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
    }
}

