/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.SynchronizationActionBase;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.table.FileStoreTable;

public abstract class SyncDatabaseActionBase
extends SynchronizationActionBase {
    protected boolean mergeShards = true;
    protected MultiTablesSinkMode mode = MultiTablesSinkMode.COMBINED;
    protected String tablePrefix = "";
    protected String tableSuffix = "";
    protected String includingTables = ".*";
    @Nullable
    protected String excludingTables;
    protected List<FileStoreTable> tables = new ArrayList<FileStoreTable>();

    public SyncDatabaseActionBase(String warehouse, String database, Map<String, String> catalogConfig, Map<String, String> cdcSourceConfig, SyncJobHandler.SourceType sourceType) {
        super(warehouse, database, catalogConfig, cdcSourceConfig, new SyncJobHandler(sourceType, cdcSourceConfig, database));
    }

    public SyncDatabaseActionBase mergeShards(boolean mergeShards) {
        this.mergeShards = mergeShards;
        return this;
    }

    public SyncDatabaseActionBase withMode(MultiTablesSinkMode mode) {
        this.mode = mode;
        return this;
    }

    public SyncDatabaseActionBase withTablePrefix(@Nullable String tablePrefix) {
        if (tablePrefix != null) {
            this.tablePrefix = tablePrefix;
        }
        return this;
    }

    public SyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
        if (tableSuffix != null) {
            this.tableSuffix = tableSuffix;
        }
        return this;
    }

    public SyncDatabaseActionBase includingTables(@Nullable String includingTables) {
        if (includingTables != null) {
            this.includingTables = includingTables;
        }
        return this;
    }

    public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables) {
        this.excludingTables = excludingTables;
        return this;
    }

    @Override
    protected void validateCaseSensitivity() {
        AbstractCatalog.validateCaseInsensitive(this.caseSensitive, "Database", this.database);
        AbstractCatalog.validateCaseInsensitive(this.caseSensitive, "Table prefix", this.tablePrefix);
        AbstractCatalog.validateCaseInsensitive(this.caseSensitive, "Table suffix", this.tableSuffix);
    }

    @Override
    protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
        return this.syncJobHandler.provideRecordParser(this.caseSensitive, Collections.emptyList(), this.typeMapping, this.metadataConverters);
    }

    @Override
    protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory() {
        NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(this.tableConfig, this.caseSensitive, this.metadataConverters);
        Pattern includingPattern = Pattern.compile(this.includingTables);
        Pattern excludingPattern = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        TableNameConverter tableNameConverter = new TableNameConverter(this.caseSensitive, this.mergeShards, this.tablePrefix, this.tableSuffix);
        return () -> new RichCdcMultiplexRecordEventParser(schemaBuilder, includingPattern, excludingPattern, tableNameConverter);
    }

    @Override
    protected void buildSink(DataStream<RichCdcMultiplexRecord> input, EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
        new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>().withInput(input).withParserFactory(parserFactory).withCatalogLoader(this.catalogLoader()).withDatabase(this.database).withTables(this.tables).withMode(this.mode).withTableOptions(this.tableConfig).build();
    }
}

