/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;

public abstract class MessageQueueSyncDatabaseActionBase
extends ActionBase {
    protected final String database;
    protected final Configuration cdcSourceConfig;
    private Map<String, String> tableConfig = new HashMap<String, String>();
    private String tablePrefix = "";
    private String tableSuffix = "";
    private String includingTables = ".*";
    @Nullable
    String excludingTables;
    private TypeMapping typeMapping = TypeMapping.defaultMapping();

    public MessageQueueSyncDatabaseActionBase(String warehouse, String database, Map<String, String> catalogConfig, Map<String, String> mqConfig) {
        super(warehouse, catalogConfig);
        this.database = database;
        this.cdcSourceConfig = Configuration.fromMap(mqConfig);
    }

    public MessageQueueSyncDatabaseActionBase withTableConfig(Map<String, String> tableConfig) {
        this.tableConfig = tableConfig;
        return this;
    }

    public MessageQueueSyncDatabaseActionBase withTablePrefix(@Nullable String tablePrefix) {
        if (tablePrefix != null) {
            this.tablePrefix = tablePrefix;
        }
        return this;
    }

    public MessageQueueSyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
        if (tableSuffix != null) {
            this.tableSuffix = tableSuffix;
        }
        return this;
    }

    public MessageQueueSyncDatabaseActionBase includingTables(@Nullable String includingTables) {
        if (includingTables != null) {
            this.includingTables = includingTables;
        }
        return this;
    }

    public MessageQueueSyncDatabaseActionBase excludingTables(@Nullable String excludingTables) {
        this.excludingTables = excludingTables;
        return this;
    }

    public MessageQueueSyncDatabaseActionBase withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    protected abstract DataStreamSource<String> buildSource() throws Exception;

    protected abstract String sourceName();

    protected abstract DataFormat getDataFormat();

    protected abstract String jobName();

    @Override
    public void build() throws Exception {
        boolean caseSensitive = this.catalog.caseSensitive();
        this.validateCaseInsensitive(caseSensitive);
        this.catalog.createDatabase(this.database, true);
        DataFormat format = this.getDataFormat();
        RecordParser recordParser = format.createParser(caseSensitive, this.typeMapping, Collections.emptyList());
        NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(this.tableConfig, caseSensitive);
        Pattern includingPattern = Pattern.compile(this.includingTables);
        Pattern excludingPattern = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, true, this.tablePrefix, this.tableSuffix);
        EventParser.Factory parserFactory = () -> new RichCdcMultiplexRecordEventParser(schemaBuilder, includingPattern, excludingPattern, tableNameConverter);
        new FlinkCdcSyncDatabaseSinkBuilder().withInput(this.buildSource().flatMap((FlatMapFunction)recordParser).name("Parse")).withParserFactory(parserFactory).withCatalogLoader(this.catalogLoader()).withDatabase(this.database).withMode(MultiTablesSinkMode.COMBINED).withTableOptions(this.tableConfig).build();
    }

    private void validateCaseInsensitive(boolean caseSensitive) {
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", this.database);
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", this.tablePrefix);
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", this.tableSuffix);
    }

    @VisibleForTesting
    public Map<String, String> tableConfig() {
        return this.tableConfig;
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute(this.jobName());
    }
}

