/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils;
import org.apache.paimon.flink.action.cdc.mysql.MySqlMetadataProcessor;
import org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.utils.Preconditions;

public class MySqlSyncTableAction
extends SyncTableActionBase {
    private MySqlSchemasInfo mySqlSchemasInfo;

    public MySqlSyncTableAction(String warehouse, String database, String table, Map<String, String> catalogConfig, Map<String, String> mySqlConfig) {
        super(warehouse, database, table, catalogConfig, mySqlConfig);
        MySqlActionUtils.registerJdbcDriver();
    }

    @Override
    protected Optional<CdcMetadataConverter<?>> metadataConverter(String column) {
        return Optional.of(MySqlMetadataProcessor.converter(column));
    }

    @Override
    protected void checkCdcSourceArgument() {
        Preconditions.checkArgument(this.cdcSourceConfig.contains(MySqlSourceOptions.TABLE_NAME), String.format("mysql-conf [%s] must be specified.", MySqlSourceOptions.TABLE_NAME.key()));
    }

    @Override
    protected Schema retrieveSchema() throws Exception {
        this.mySqlSchemasInfo = MySqlActionUtils.getMySqlTableInfos(this.cdcSourceConfig, this.monitorTablePredication(), new ArrayList<Identifier>(), this.typeMapping, this.catalog.caseSensitive());
        this.validateMySqlTableInfos(this.mySqlSchemasInfo);
        MySqlTableInfo tableInfo = this.mySqlSchemasInfo.mergeAll();
        return tableInfo.schema();
    }

    @Override
    protected DataStreamSource<String> buildSource() throws Exception {
        String tableList = this.mySqlSchemasInfo.pkTables().stream().map(i -> i.getDatabaseName() + "\\." + i.getObjectName()).collect(Collectors.joining("|"));
        return this.buildDataStreamSource(MySqlActionUtils.buildMySqlSource(this.cdcSourceConfig, tableList));
    }

    @Override
    protected String sourceName() {
        return "MySQL Source";
    }

    @Override
    protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
        boolean caseSensitive = this.catalog.caseSensitive();
        return new MySqlRecordParser(this.cdcSourceConfig, caseSensitive, this.computedColumns, this.typeMapping, this.metadataConverters);
    }

    @Override
    protected String jobName() {
        return String.format("MySQL-Paimon Table Sync: %s.%s", this.database, this.table);
    }

    private void validateMySqlTableInfos(MySqlSchemasInfo mySqlSchemasInfo) {
        List<Identifier> nonPkTables = mySqlSchemasInfo.nonPkTables();
        Preconditions.checkArgument(nonPkTables.isEmpty(), "Source tables of MySQL table synchronization job cannot contain table which doesn't have primary keys.\nThey are: %s", nonPkTables.stream().map(Identifier::getFullName).collect(Collectors.joining(",")));
        Preconditions.checkArgument(!mySqlSchemasInfo.pkTables().isEmpty(), "No table satisfies the given database name and table name.");
    }

    private Predicate<String> monitorTablePredication() {
        return tableName -> {
            Pattern tableNamePattern = Pattern.compile((String)this.cdcSourceConfig.get(MySqlSourceOptions.TABLE_NAME));
            return tableNamePattern.matcher((CharSequence)tableName).matches();
        };
    }
}

