/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils;
import org.apache.paimon.flink.action.cdc.mysql.MySqlMetadataProcessor;
import org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSyncDatabaseAction
extends ActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSyncDatabaseAction.class);
    private final String database;
    private final Configuration mySqlConfig;
    private Map<String, String> tableConfig = new HashMap<String, String>();
    private boolean ignoreIncompatible = false;
    private boolean mergeShards = true;
    private String tablePrefix = "";
    private String tableSuffix = "";
    private String includingTables = ".*";
    @Nullable
    String excludingTables;
    private MultiTablesSinkMode mode = MultiTablesSinkMode.DIVIDED;
    private TypeMapping typeMapping = TypeMapping.defaultMapping();
    private final List<Identifier> monitoredTables = new ArrayList<Identifier>();
    private final List<Identifier> excludedTables = new ArrayList<Identifier>();
    private List<String> metadataColumn = new ArrayList<String>();

    public MySqlSyncDatabaseAction(String warehouse, String database, Map<String, String> catalogConfig, Map<String, String> mySqlConfig) {
        super(warehouse, catalogConfig);
        this.database = database;
        this.mySqlConfig = Configuration.fromMap(mySqlConfig);
        MySqlActionUtils.registerJdbcDriver();
    }

    public MySqlSyncDatabaseAction withTableConfig(Map<String, String> tableConfig) {
        this.tableConfig = tableConfig;
        return this;
    }

    public MySqlSyncDatabaseAction ignoreIncompatible(boolean ignoreIncompatible) {
        this.ignoreIncompatible = ignoreIncompatible;
        return this;
    }

    public MySqlSyncDatabaseAction mergeShards(boolean mergeShards) {
        this.mergeShards = mergeShards;
        return this;
    }

    public MySqlSyncDatabaseAction withTablePrefix(@Nullable String tablePrefix) {
        if (tablePrefix != null) {
            this.tablePrefix = tablePrefix;
        }
        return this;
    }

    public MySqlSyncDatabaseAction withTableSuffix(@Nullable String tableSuffix) {
        if (tableSuffix != null) {
            this.tableSuffix = tableSuffix;
        }
        return this;
    }

    public MySqlSyncDatabaseAction includingTables(@Nullable String includingTables) {
        if (includingTables != null) {
            this.includingTables = includingTables;
        }
        return this;
    }

    public MySqlSyncDatabaseAction excludingTables(@Nullable String excludingTables) {
        this.excludingTables = excludingTables;
        return this;
    }

    public MySqlSyncDatabaseAction withMode(MultiTablesSinkMode mode) {
        this.mode = mode;
        return this;
    }

    public MySqlSyncDatabaseAction withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    public MySqlSyncDatabaseAction withMetadataKeys(List<String> metadataKeys) {
        this.metadataColumn = metadataKeys;
        return this;
    }

    @Override
    public void build() throws Exception {
        Preconditions.checkArgument(!this.mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME), MySqlSourceOptions.TABLE_NAME.key() + " cannot be set for mysql-sync-database. If you want to sync several MySQL tables into one Paimon table, use mysql-sync-table instead.");
        boolean caseSensitive = this.catalog.caseSensitive();
        this.validateCaseInsensitive(caseSensitive);
        Pattern includingPattern = Pattern.compile(this.includingTables);
        Pattern excludingPattern = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        MySqlSchemasInfo mySqlSchemasInfo = MySqlActionUtils.getMySqlTableInfos(this.mySqlConfig, tableName -> this.shouldMonitorTable((String)tableName, includingPattern, excludingPattern), this.excludedTables, this.typeMapping, caseSensitive);
        this.logNonPkTables(mySqlSchemasInfo.nonPkTables());
        List<MySqlTableInfo> mySqlTableInfos = mySqlSchemasInfo.toMySqlTableInfos(this.mergeShards);
        Preconditions.checkArgument(mySqlTableInfos.size() > 0, "No tables found in MySQL database " + (String)this.mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME) + ", or MySQL database does not exist.");
        this.catalog.createDatabase(this.database, true);
        TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, this.mergeShards, this.tablePrefix, this.tableSuffix);
        CdcMetadataConverter[] metadataConverters = (CdcMetadataConverter[])this.metadataColumn.stream().map(MySqlMetadataProcessor::converter).toArray(CdcMetadataConverter[]::new);
        ArrayList<FileStoreTable> fileStoreTables = new ArrayList<FileStoreTable>();
        for (MySqlTableInfo tableInfo : mySqlTableInfos) {
            Table table;
            Identifier identifier = Identifier.create(this.database, tableNameConverter.convert(tableInfo.toPaimonTableName()));
            Schema fromMySql = CdcActionCommonUtils.buildPaimonSchema(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), this.tableConfig, tableInfo.schema(), metadataConverters, true);
            try {
                table = (FileStoreTable)this.catalog.getTable(identifier);
                table = table.copy((Map)this.tableConfig);
                Supplier<String> errMsg = this.incompatibleMessage(table.schema(), tableInfo, identifier);
                if (this.shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
                    fileStoreTables.add((FileStoreTable)table);
                    this.monitoredTables.addAll(tableInfo.identifiers());
                    continue;
                }
                this.excludedTables.addAll(tableInfo.identifiers());
            }
            catch (Catalog.TableNotExistException e) {
                this.catalog.createTable(identifier, fromMySql, false);
                table = (FileStoreTable)this.catalog.getTable(identifier);
                fileStoreTables.add((FileStoreTable)table);
                this.monitoredTables.addAll(tableInfo.identifiers());
            }
        }
        Preconditions.checkState(!this.monitoredTables.isEmpty(), "No tables to be synchronized. Possible cause is the schemas of all tables in specified MySQL database are not compatible with those of existed Paimon tables. Please check the log.");
        MySqlSource<String> source = MySqlActionUtils.buildMySqlSource(this.mySqlConfig, CdcActionCommonUtils.tableList(this.mode, (String)this.mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME), this.includingTables, this.monitoredTables, this.excludedTables));
        NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(this.tableConfig, caseSensitive);
        TypeMapping typeMapping = this.typeMapping;
        MySqlRecordParser recordParser = new MySqlRecordParser(this.mySqlConfig, caseSensitive, typeMapping, metadataConverters);
        EventParser.Factory parserFactory = () -> new RichCdcMultiplexRecordEventParser(schemaBuilder, includingPattern, excludingPattern, tableNameConverter);
        String database = this.database;
        MultiTablesSinkMode mode = this.mode;
        new FlinkCdcSyncDatabaseSinkBuilder().withInput(this.env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").flatMap((FlatMapFunction)recordParser).name("Parse")).withParserFactory(parserFactory).withDatabase(database).withCatalogLoader(this.catalogLoader()).withTables(fileStoreTables).withMode(mode).withTableOptions(this.tableConfig).build();
    }

    private void validateCaseInsensitive(boolean caseSensitive) {
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", this.database);
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", this.tablePrefix);
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", this.tableSuffix);
    }

    private void logNonPkTables(List<Identifier> nonPkTables) {
        if (!nonPkTables.isEmpty()) {
            LOG.debug("Didn't find primary keys for tables '{}'. These tables won't be synchronized.", (Object)nonPkTables.stream().map(Identifier::getFullName).collect(Collectors.joining(",")));
            this.excludedTables.addAll(nonPkTables);
        }
    }

    private boolean shouldMonitorTable(String mySqlTableName, Pattern includingPattern, @Nullable Pattern excludingPattern) {
        boolean shouldMonitor = includingPattern.matcher(mySqlTableName).matches();
        if (excludingPattern != null) {
            boolean bl = shouldMonitor = shouldMonitor && !excludingPattern.matcher(mySqlTableName).matches();
        }
        if (!shouldMonitor) {
            LOG.debug("Source table '{}' is excluded.", (Object)mySqlTableName);
        }
        return shouldMonitor;
    }

    private boolean shouldMonitorTable(TableSchema tableSchema, Schema mySqlSchema, Supplier<String> errMsg) {
        if (CdcActionCommonUtils.schemaCompatible(tableSchema, mySqlSchema.fields())) {
            return true;
        }
        if (this.ignoreIncompatible) {
            LOG.warn(errMsg.get() + "This table will be ignored.");
            return false;
        }
        throw new IllegalArgumentException(errMsg.get() + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
    }

    private Supplier<String> incompatibleMessage(TableSchema paimonSchema, MySqlTableInfo mySqlTableInfo, Identifier identifier) {
        return () -> String.format("Incompatible schema found.\nPaimon table is: %s, fields are: %s.\nMySQL table is: %s, fields are: %s.\n", identifier.getFullName(), paimonSchema.fields(), mySqlTableInfo.location(), mySqlTableInfo.schema().fields());
    }

    @VisibleForTesting
    public List<Identifier> monitoredTables() {
        return this.monitoredTables;
    }

    @VisibleForTesting
    public List<Identifier> excludedTables() {
        return this.excludedTables;
    }

    @VisibleForTesting
    public Map<String, String> tableConfig() {
        return this.tableConfig;
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute(String.format("MySQL-Paimon Database Sync: %s", this.database));
    }
}

