/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.postgres;

import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.flink.action.cdc.postgres.PostgresActionUtils;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.utils.Preconditions;

public class PostgresSyncTableAction
extends SyncTableActionBase {
    private JdbcSchemasInfo postgresSchemasInfo;

    public PostgresSyncTableAction(String warehouse, String database, String table, Map<String, String> catalogConfig, Map<String, String> postgresConfig) {
        super(warehouse, database, table, catalogConfig, postgresConfig, SyncJobHandler.SourceType.POSTGRES);
    }

    @Override
    protected Schema retrieveSchema() throws Exception {
        this.postgresSchemasInfo = PostgresActionUtils.getPostgresTableInfos(this.cdcSourceConfig, this.monitorTablePredication(), new ArrayList<Identifier>(), this.typeMapping);
        this.validatePostgresTableInfos(this.postgresSchemasInfo);
        JdbcTableInfo tableInfo = this.postgresSchemasInfo.mergeAll();
        return tableInfo.schema();
    }

    protected JdbcIncrementalSource<String> buildSource() {
        List<JdbcSchemasInfo.JdbcSchemaInfo> pkTables = this.postgresSchemasInfo.pkTables();
        HashSet<String> schemaList = new HashSet<String>();
        String[] tableList = new String[pkTables.size()];
        for (int i = 0; i < pkTables.size(); ++i) {
            JdbcSchemasInfo.JdbcSchemaInfo pkTable = pkTables.get(i);
            tableList[i] = pkTable.schemaName() + "." + pkTable.identifier().getObjectName();
            schemaList.add(pkTable.schemaName());
        }
        return PostgresActionUtils.buildPostgresSource(this.cdcSourceConfig, schemaList.toArray(new String[0]), tableList);
    }

    private void validatePostgresTableInfos(JdbcSchemasInfo jdbcSchemasInfo) {
        List<Identifier> nonPkTables = jdbcSchemasInfo.nonPkTables();
        Preconditions.checkArgument(nonPkTables.isEmpty(), "Source tables of PostgreSQL table synchronization job cannot contain table which doesn't have primary keys.\nThey are: %s", nonPkTables.stream().map(Identifier::getFullName).collect(Collectors.joining(",")));
        Preconditions.checkArgument(!jdbcSchemasInfo.pkTables().isEmpty(), "No table satisfies the given database name and table name.");
    }

    private Predicate<String> monitorTablePredication() {
        return tableName -> {
            Pattern tableNamePattern = Pattern.compile((String)this.cdcSourceConfig.get(PostgresSourceOptions.TABLE_NAME));
            return tableNamePattern.matcher((CharSequence)tableName).matches();
        };
    }
}

