/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo;
import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSyncDatabaseAction
extends SyncDatabaseActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSyncDatabaseAction.class);
    private boolean ignoreIncompatible = false;
    private final List<Identifier> monitoredTables = new ArrayList<Identifier>();
    private final List<Identifier> excludedTables = new ArrayList<Identifier>();

    public MySqlSyncDatabaseAction(String database, Map<String, String> catalogConfig, Map<String, String> mySqlConfig) {
        super(database, catalogConfig, mySqlConfig, SyncJobHandler.SourceType.MYSQL);
        this.mode = MultiTablesSinkMode.DIVIDED;
    }

    public MySqlSyncDatabaseAction ignoreIncompatible(boolean ignoreIncompatible) {
        this.ignoreIncompatible = ignoreIncompatible;
        return this;
    }

    @Override
    protected void beforeBuildingSourceSink() throws Exception {
        Pattern includingPattern = Pattern.compile(this.includingTables);
        Pattern excludingPattern = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        JdbcSchemasInfo mySqlSchemasInfo = MySqlActionUtils.getMySqlTableInfos(this.cdcSourceConfig, tableName -> this.shouldMonitorTable((String)tableName, includingPattern, excludingPattern), this.excludedTables, this.typeMapping);
        this.logNonPkTables(mySqlSchemasInfo.nonPkTables());
        List<JdbcTableInfo> jdbcTableInfos = mySqlSchemasInfo.toMySqlTableInfos(this.mergeShards);
        Preconditions.checkArgument(!jdbcTableInfos.isEmpty(), "No tables found in MySQL database " + (String)this.cdcSourceConfig.get(MySqlSourceOptions.DATABASE_NAME) + ", or MySQL database does not exist.");
        TableNameConverter tableNameConverter = new TableNameConverter(this.caseSensitive, this.mergeShards, this.tablePrefix, this.tableSuffix, this.tableMapping);
        for (JdbcTableInfo tableInfo : jdbcTableInfos) {
            FileStoreTable table;
            Identifier identifier = Identifier.create(this.database, tableNameConverter.convert("", tableInfo.toPaimonTableName()));
            Schema fromMySql = CdcActionCommonUtils.buildPaimonSchema(identifier.getFullName(), this.partitionKeys, this.primaryKeys, Collections.emptyList(), this.tableConfig, tableInfo.schema(), this.metadataConverters, this.caseSensitive, false, true, this.syncPKeysFromSourceSchema);
            try {
                table = (FileStoreTable)this.catalog.getTable(identifier);
                Supplier<String> errMsg = this.incompatibleMessage(table.schema(), tableInfo, identifier);
                if (this.shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
                    table = this.alterTableOptions(identifier, table);
                    this.tables.add(table);
                    this.monitoredTables.addAll(tableInfo.identifiers());
                    continue;
                }
                this.excludedTables.addAll(tableInfo.identifiers());
            }
            catch (Catalog.TableNotExistException e) {
                this.catalog.createTable(identifier, fromMySql, false);
                table = (FileStoreTable)this.catalog.getTable(identifier);
                this.tables.add(table);
                this.monitoredTables.addAll(tableInfo.identifiers());
            }
        }
        Preconditions.checkState(!this.monitoredTables.isEmpty(), "No tables to be synchronized. Possible cause is the schemas of all tables in specified MySQL database are not compatible with those of existed Paimon tables. Please check the log.");
    }

    @Override
    protected CdcTimestampExtractor createCdcTimestampExtractor() {
        return MySqlActionUtils.createCdcTimestampExtractor();
    }

    protected MySqlSource<CdcSourceRecord> buildSource() {
        this.validateRuntimeExecutionMode();
        return MySqlActionUtils.buildMySqlSource(this.cdcSourceConfig, CdcActionCommonUtils.tableList(this.mode, (String)this.cdcSourceConfig.get(MySqlSourceOptions.DATABASE_NAME), this.includingTables, this.monitoredTables, this.excludedTables), this.typeMapping);
    }

    private void logNonPkTables(List<Identifier> nonPkTables) {
        if (!nonPkTables.isEmpty()) {
            LOG.debug("Didn't find primary keys for tables '{}'. These tables won't be synchronized.", (Object)nonPkTables.stream().map(Identifier::getFullName).collect(Collectors.joining(",")));
            this.excludedTables.addAll(nonPkTables);
        }
    }

    private boolean shouldMonitorTable(String mySqlTableName, Pattern includingPattern, @Nullable Pattern excludingPattern) {
        boolean shouldMonitor = includingPattern.matcher(mySqlTableName).matches();
        if (excludingPattern != null) {
            boolean bl = shouldMonitor = shouldMonitor && !excludingPattern.matcher(mySqlTableName).matches();
        }
        if (!shouldMonitor) {
            LOG.debug("Source table '{}' is excluded.", (Object)mySqlTableName);
        }
        return shouldMonitor;
    }

    private boolean shouldMonitorTable(TableSchema tableSchema, Schema mySqlSchema, Supplier<String> errMsg) {
        if (CdcActionCommonUtils.schemaCompatible(tableSchema, mySqlSchema.fields())) {
            return true;
        }
        if (this.ignoreIncompatible) {
            LOG.warn(errMsg.get() + "This table will be ignored.");
            return false;
        }
        throw new IllegalArgumentException(errMsg.get() + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
    }

    private Supplier<String> incompatibleMessage(TableSchema paimonSchema, JdbcTableInfo jdbcTableInfo, Identifier identifier) {
        return () -> String.format("Incompatible schema found.\nPaimon table is: %s, fields are: %s.\nMySQL table is: %s, fields are: %s.\n", identifier.getFullName(), paimonSchema.fields(), jdbcTableInfo.location(), jdbcTableInfo.schema().fields());
    }

    @VisibleForTesting
    public List<Identifier> monitoredTables() {
        return this.monitoredTables;
    }

    @VisibleForTesting
    public List<Identifier> excludedTables() {
        return this.excludedTables;
    }

    @Override
    protected boolean requirePrimaryKeys() {
        return true;
    }
}

