/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils;
import org.apache.paimon.flink.action.cdc.mysql.MySqlDebeziumJsonEventParser;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

public class MySqlSyncTableAction
extends ActionBase {
    private final String database;
    private final String table;
    private final Configuration mySqlConfig;
    private final List<String> partitionKeys = new ArrayList<String>();
    private final List<String> primaryKeys = new ArrayList<String>();
    private Map<String, String> tableConfig = new HashMap<String, String>();
    private List<String> computedColumnArgs = new ArrayList<String>();
    private TypeMapping typeMapping = TypeMapping.defaultMapping();

    public MySqlSyncTableAction(String warehouse, String database, String table, Map<String, String> mySqlConfig) {
        this(warehouse, database, table, Collections.emptyMap(), mySqlConfig);
    }

    public MySqlSyncTableAction(String warehouse, String database, String table, Map<String, String> catalogConfig, Map<String, String> mySqlConfig) {
        super(warehouse, catalogConfig);
        this.database = database;
        this.table = table;
        this.mySqlConfig = Configuration.fromMap(mySqlConfig);
    }

    public MySqlSyncTableAction withPartitionKeys(String ... partitionKeys) {
        return this.withPartitionKeys(Arrays.asList(partitionKeys));
    }

    public MySqlSyncTableAction withPartitionKeys(List<String> partitionKeys) {
        this.partitionKeys.addAll(partitionKeys);
        return this;
    }

    public MySqlSyncTableAction withPrimaryKeys(String ... primaryKeys) {
        return this.withPrimaryKeys(Arrays.asList(primaryKeys));
    }

    public MySqlSyncTableAction withPrimaryKeys(List<String> primaryKeys) {
        this.primaryKeys.addAll(primaryKeys);
        return this;
    }

    public MySqlSyncTableAction withTableConfig(Map<String, String> tableConfig) {
        this.tableConfig = tableConfig;
        return this;
    }

    public MySqlSyncTableAction withComputedColumnArgs(List<String> computedColumnArgs) {
        this.computedColumnArgs = computedColumnArgs;
        return this;
    }

    public MySqlSyncTableAction withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    public void build(StreamExecutionEnvironment env) throws Exception {
        FileStoreTable table;
        Preconditions.checkArgument(this.mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME), String.format("mysql-conf [%s] must be specified.", MySqlSourceOptions.TABLE_NAME.key()));
        boolean caseSensitive = this.catalog.caseSensitive();
        if (!caseSensitive) {
            this.validateCaseInsensitive();
        }
        MySqlSchemasInfo mySqlSchemasInfo = MySqlActionUtils.getMySqlTableInfos(this.mySqlConfig, this.monitorTablePredication(), new ArrayList<Identifier>(), this.typeMapping);
        this.validateMySqlTableInfos(mySqlSchemasInfo);
        this.catalog.createDatabase(this.database, true);
        MySqlTableInfo tableInfo = mySqlSchemasInfo.mergeAll();
        Identifier identifier = new Identifier(this.database, this.table);
        List<ComputedColumn> computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, tableInfo.schema().typeMapping());
        Schema fromMySql = MySqlActionUtils.buildPaimonSchema(tableInfo, this.partitionKeys, this.primaryKeys, computedColumns, this.tableConfig, caseSensitive);
        try {
            table = (FileStoreTable)this.catalog.getTable(identifier);
            if (computedColumns.size() > 0) {
                List computedFields = computedColumns.stream().map(ComputedColumn::columnName).collect(Collectors.toList());
                List<String> fieldNames = table.schema().fieldNames();
                Preconditions.checkArgument(new HashSet<String>(fieldNames).containsAll(computedFields), " Exists Table should contain all computed columns %s, but are %s.", computedFields, fieldNames);
            }
            MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
        }
        catch (Catalog.TableNotExistException e) {
            this.catalog.createTable(identifier, fromMySql, false);
            table = (FileStoreTable)this.catalog.getTable(identifier);
        }
        String tableList = mySqlSchemasInfo.pkTables().stream().map(i -> i.getDatabaseName() + "\\." + i.getObjectName()).collect(Collectors.joining("|"));
        MySqlSource<String> source = MySqlActionUtils.buildMySqlSource(this.mySqlConfig, tableList);
        String serverTimeZone = (String)this.mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
        ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(serverTimeZone);
        TypeMapping typeMapping = this.typeMapping;
        EventParser.Factory parserFactory = () -> new MySqlDebeziumJsonEventParser(zoneId, caseSensitive, computedColumns, typeMapping);
        CdcSinkBuilder sinkBuilder = new CdcSinkBuilder().withInput(env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")).withParserFactory(parserFactory).withTable(table).withIdentifier(identifier).withCatalogLoader(this.catalogLoader());
        String sinkParallelism = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (sinkParallelism != null) {
            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
        }
        sinkBuilder.build();
    }

    private void validateCaseInsensitive() {
        Preconditions.checkArgument(this.database.equals(this.database.toLowerCase()), String.format("Database name [%s] cannot contain upper case in case-insensitive catalog.", this.database));
        Preconditions.checkArgument(this.table.equals(this.table.toLowerCase()), String.format("Table name [%s] cannot contain upper case in case-insensitive catalog.", this.table));
        for (String part : this.partitionKeys) {
            Preconditions.checkArgument(part.equals(part.toLowerCase()), String.format("Partition keys [%s] cannot contain upper case in case-insensitive catalog.", this.partitionKeys));
        }
        for (String pk : this.primaryKeys) {
            Preconditions.checkArgument(pk.equals(pk.toLowerCase()), String.format("Primary keys [%s] cannot contain upper case in case-insensitive catalog.", this.primaryKeys));
        }
    }

    private void validateMySqlTableInfos(MySqlSchemasInfo mySqlSchemasInfo) {
        List<Identifier> nonPkTables = mySqlSchemasInfo.nonPkTables();
        Preconditions.checkArgument(nonPkTables.isEmpty(), "Source tables of MySQL table synchronization job cannot contain table which doesn't have primary keys.\nThey are: %s", nonPkTables.stream().map(Identifier::getFullName).collect(Collectors.joining(",")));
        Preconditions.checkArgument(!mySqlSchemasInfo.pkTables().isEmpty(), "No table satisfies the given database name and table name.");
    }

    private Predicate<String> monitorTablePredication() {
        return tableName -> {
            Pattern tableNamePattern = Pattern.compile((String)this.mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
            return tableNamePattern.matcher((CharSequence)tableName).matches();
        };
    }

    @Override
    public void run() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.build(env);
        this.execute(env, String.format("MySQL-Paimon Table Sync: %s.%s", this.database, this.table));
    }
}

