/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mongodb;

import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils;
import org.apache.paimon.flink.action.cdc.mongodb.MongoDBRecordParser;
import org.apache.paimon.flink.action.cdc.mongodb.MongodbSchemaUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.utils.Preconditions;

public class MongoDBSyncTableAction
extends SyncTableActionBase {
    public MongoDBSyncTableAction(String warehouse, String database, String table, Map<String, String> catalogConfig, Map<String, String> mongodbConfig) {
        super(warehouse, database, table, catalogConfig, mongodbConfig);
    }

    @Override
    protected void checkCdcSourceArgument() {
        Preconditions.checkArgument(this.cdcSourceConfig.contains(MongoDBSourceOptions.COLLECTION), String.format("mongodb-conf [%s] must be specified.", MongoDBSourceOptions.COLLECTION.key()));
    }

    @Override
    protected Schema retrieveSchema() throws Exception {
        boolean caseSensitive = this.catalog.caseSensitive();
        return MongodbSchemaUtils.getMongodbSchema(this.cdcSourceConfig, caseSensitive);
    }

    @Override
    protected DataStreamSource<String> buildSource() throws Exception {
        String tableList = (String)this.cdcSourceConfig.get(MongoDBSourceOptions.DATABASE) + "\\." + (String)this.cdcSourceConfig.get(MongoDBSourceOptions.COLLECTION);
        return this.buildDataStreamSource(MongoDBActionUtils.buildMongodbSource(this.cdcSourceConfig, tableList));
    }

    @Override
    protected String sourceName() {
        return "MongoDB Source";
    }

    @Override
    protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
        boolean caseSensitive = this.catalog.caseSensitive();
        return new MongoDBRecordParser(caseSensitive, this.computedColumns, this.cdcSourceConfig);
    }

    @Override
    protected String jobName() {
        return String.format("MongoDB-Paimon Table Sync: %s.%s", this.database, this.table);
    }
}

