/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.SynchronizationActionBase;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.TableFilter;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.table.FileStoreTable;

public abstract class SyncDatabaseActionBase
extends SynchronizationActionBase {
    protected boolean eagerInit = false;
    protected boolean mergeShards = true;
    protected MultiTablesSinkMode mode = MultiTablesSinkMode.COMBINED;
    protected String tablePrefix = "";
    protected String tableSuffix = "";
    protected Map<String, String> tableMapping = new HashMap<String, String>();
    protected Map<String, String> dbPrefix = new HashMap<String, String>();
    protected Map<String, String> dbSuffix = new HashMap<String, String>();
    protected String includingTables = ".*";
    protected List<String> partitionKeys = new ArrayList<String>();
    protected List<String> primaryKeys = new ArrayList<String>();
    protected List<ComputedColumn> computedColumns = new ArrayList<ComputedColumn>();
    @Nullable
    protected String excludingTables;
    protected String includingDbs = ".*";
    @Nullable
    protected String excludingDbs;
    protected List<FileStoreTable> tables = new ArrayList<FileStoreTable>();
    protected Map<String, List<String>> partitionKeyMultiple = new HashMap<String, List<String>>();

    public SyncDatabaseActionBase(String database, Map<String, String> catalogConfig, Map<String, String> cdcSourceConfig, SyncJobHandler.SourceType sourceType) {
        super(database, catalogConfig, cdcSourceConfig, new SyncJobHandler(sourceType, cdcSourceConfig, database));
    }

    public SyncDatabaseActionBase mergeShards(boolean mergeShards) {
        this.mergeShards = mergeShards;
        return this;
    }

    public SyncDatabaseActionBase eagerInit(boolean eagerInit) {
        this.eagerInit = eagerInit;
        return this;
    }

    public SyncDatabaseActionBase withMode(MultiTablesSinkMode mode) {
        this.mode = mode;
        return this;
    }

    public SyncDatabaseActionBase withTablePrefix(@Nullable String tablePrefix) {
        if (tablePrefix != null) {
            this.tablePrefix = tablePrefix;
        }
        return this;
    }

    public SyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
        if (tableSuffix != null) {
            this.tableSuffix = tableSuffix;
        }
        return this;
    }

    public SyncDatabaseActionBase withDbPrefix(Map<String, String> dbPrefix) {
        if (dbPrefix != null) {
            this.dbPrefix = dbPrefix.entrySet().stream().collect(HashMap::new, (m, e) -> {
                String cfr_ignored_0 = (String)m.put(((String)e.getKey()).toLowerCase(), e.getValue());
            }, HashMap::putAll);
        }
        return this;
    }

    public SyncDatabaseActionBase withDbSuffix(Map<String, String> dbSuffix) {
        if (dbSuffix != null) {
            this.dbSuffix = dbSuffix.entrySet().stream().collect(HashMap::new, (m, e) -> {
                String cfr_ignored_0 = (String)m.put(((String)e.getKey()).toLowerCase(), e.getValue());
            }, HashMap::putAll);
        }
        return this;
    }

    public SyncDatabaseActionBase withTableMapping(Map<String, String> tableMapping) {
        if (tableMapping != null) {
            this.tableMapping = tableMapping;
        }
        return this;
    }

    public SyncDatabaseActionBase includingTables(@Nullable String includingTables) {
        if (includingTables != null) {
            this.includingTables = includingTables;
        }
        return this;
    }

    public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables) {
        this.excludingTables = excludingTables;
        return this;
    }

    public SyncDatabaseActionBase includingDbs(@Nullable String includingDbs) {
        if (includingDbs != null) {
            this.includingDbs = includingDbs;
        }
        return this;
    }

    public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) {
        this.excludingDbs = excludingDbs;
        return this;
    }

    public SyncDatabaseActionBase withPartitionKeys(String ... partitionKeys) {
        this.partitionKeys.addAll(Arrays.asList(partitionKeys));
        return this;
    }

    public SyncDatabaseActionBase withPrimaryKeys(String ... primaryKeys) {
        this.primaryKeys.addAll(Arrays.asList(primaryKeys));
        return this;
    }

    public SyncDatabaseActionBase withComputedColumnArgs(List<String> computedColumnArgs) {
        this.computedColumns = ComputedColumnUtils.buildComputedColumns(computedColumnArgs, Collections.emptyList());
        return this;
    }

    @Override
    protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
        return this.syncJobHandler.provideRecordParser(this.computedColumns, this.typeMapping, this.metadataConverters);
    }

    public SyncDatabaseActionBase withPartitionKeyMultiple(Map<String, List<String>> partitionKeyMultiple) {
        if (partitionKeyMultiple != null) {
            this.partitionKeyMultiple = partitionKeyMultiple;
        }
        return this;
    }

    @Override
    protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory() {
        HashSet<String> createdTables;
        NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(this.tableConfig, this.caseSensitive, this.partitionKeys, this.primaryKeys, this.requirePrimaryKeys(), this.syncPKeysFromSourceSchema, this.partitionKeyMultiple, this.metadataConverters);
        Pattern tblIncludingPattern = Pattern.compile(this.includingTables);
        Pattern tblExcludingPattern = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        Pattern dbIncludingPattern = Pattern.compile(this.includingDbs);
        Pattern dbExcludingPattern = this.excludingDbs == null ? null : Pattern.compile(this.excludingDbs);
        TableNameConverter tableNameConverter = new TableNameConverter(this.caseSensitive, this.mergeShards, this.dbPrefix, this.dbSuffix, this.tablePrefix, this.tableSuffix, this.tableMapping);
        try {
            createdTables = new HashSet<String>(this.catalog.listTables(this.database));
        }
        catch (Catalog.DatabaseNotExistException e) {
            throw new RuntimeException(e);
        }
        return () -> new RichCdcMultiplexRecordEventParser(schemaBuilder, tblIncludingPattern, tblExcludingPattern, dbIncludingPattern, dbExcludingPattern, tableNameConverter, createdTables);
    }

    protected abstract boolean requirePrimaryKeys();

    @Override
    protected void buildSink(DataStream<RichCdcMultiplexRecord> input, EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
        ArrayList<String> whiteList = new ArrayList<String>(this.tableMapping.values());
        ArrayList<String> prefixList = new ArrayList<String>(this.dbPrefix.values());
        prefixList.add(this.tablePrefix);
        ArrayList<String> suffixList = new ArrayList<String>(this.dbSuffix.values());
        suffixList.add(this.tableSuffix);
        new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>().withInput(input).withParserFactory(parserFactory).withCatalogLoader(this.catalogLoader()).withTypeMapping(this.typeMapping).withDatabase(this.database).withTables(this.tables).withMode(this.mode).withTableOptions(this.tableConfig).withEagerInit(this.eagerInit).withTableFilter(new TableFilter(this.database, whiteList, prefixList, suffixList, this.includingTables, this.excludingTables)).build();
    }
}

