/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mongodb;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils;
import org.apache.paimon.flink.action.cdc.mongodb.MongoDBRecordParser;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;

public class MongoDBSyncDatabaseAction
extends ActionBase {
    private final String database;
    private final Configuration mongodbConfig;
    private Map<String, String> tableConfig = new HashMap<String, String>();
    private String tablePrefix = "";
    private String tableSuffix = "";
    private String includingTables = ".*";
    @Nullable
    String excludingTables;

    public MongoDBSyncDatabaseAction(String warehouse, String database, Map<String, String> catalogConfig, Map<String, String> mongodbConfig) {
        super(warehouse, catalogConfig);
        this.database = database;
        this.mongodbConfig = Configuration.fromMap(mongodbConfig);
    }

    public MongoDBSyncDatabaseAction withTableConfig(Map<String, String> tableConfig) {
        this.tableConfig = tableConfig;
        return this;
    }

    public MongoDBSyncDatabaseAction withTablePrefix(@Nullable String tablePrefix) {
        if (tablePrefix != null) {
            this.tablePrefix = tablePrefix;
        }
        return this;
    }

    public MongoDBSyncDatabaseAction withTableSuffix(@Nullable String tableSuffix) {
        if (tableSuffix != null) {
            this.tableSuffix = tableSuffix;
        }
        return this;
    }

    public MongoDBSyncDatabaseAction includingTables(@Nullable String includingTables) {
        if (includingTables != null) {
            this.includingTables = includingTables;
        }
        return this;
    }

    public MongoDBSyncDatabaseAction excludingTables(@Nullable String excludingTables) {
        this.excludingTables = excludingTables;
        return this;
    }

    @Override
    public void build() throws Exception {
        boolean caseSensitive = this.catalog.caseSensitive();
        this.validateCaseInsensitive(caseSensitive);
        this.catalog.createDatabase(this.database, true);
        ArrayList<Identifier> excludedTables = new ArrayList<Identifier>();
        MongoDBSource<String> source = MongoDBActionUtils.buildMongodbSource(this.mongodbConfig, CdcActionCommonUtils.combinedModeTableList((String)this.mongodbConfig.get(MongoDBSourceOptions.DATABASE), this.includingTables, excludedTables));
        NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(this.tableConfig, caseSensitive);
        Pattern includingPattern = Pattern.compile(this.includingTables);
        Pattern excludingPattern = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, true, this.tablePrefix, this.tableSuffix);
        EventParser.Factory parserFactory = () -> new RichCdcMultiplexRecordEventParser(schemaBuilder, includingPattern, excludingPattern, tableNameConverter);
        new FlinkCdcSyncDatabaseSinkBuilder().withInput(this.env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB Source").flatMap((FlatMapFunction)new MongoDBRecordParser(caseSensitive, this.mongodbConfig)).name("Parse")).withParserFactory(parserFactory).withCatalogLoader(this.catalogLoader()).withDatabase(this.database).withMode(MultiTablesSinkMode.COMBINED).withTableOptions(this.tableConfig).build();
    }

    private void validateCaseInsensitive(boolean caseSensitive) {
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", this.database);
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", this.tablePrefix);
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", this.tableSuffix);
    }

    @VisibleForTesting
    public Map<String, String> tableConfig() {
        return this.tableConfig;
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute(String.format("MongoDB-Paimon Database Sync: %s", this.database));
    }
}

