/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Collections;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordSchemaBuilder;
import org.apache.paimon.utils.Preconditions;

public class KafkaSyncDatabaseAction
extends ActionBase {
    private final Configuration kafkaConfig;
    private final String database;
    private final String tablePrefix;
    private final String tableSuffix;
    @Nullable
    private final Pattern includingPattern;
    @Nullable
    private final Pattern excludingPattern;
    private final Map<String, String> tableConfig;

    public KafkaSyncDatabaseAction(Map<String, String> kafkaConfig, String warehouse, String database, Map<String, String> catalogConfig, Map<String, String> tableConfig) {
        this(kafkaConfig, warehouse, database, null, null, null, null, catalogConfig, tableConfig);
    }

    public KafkaSyncDatabaseAction(Map<String, String> kafkaConfig, String warehouse, String database, @Nullable String tablePrefix, @Nullable String tableSuffix, @Nullable String includingTables, @Nullable String excludingTables, Map<String, String> catalogConfig, Map<String, String> tableConfig) {
        super(warehouse, catalogConfig);
        this.kafkaConfig = Configuration.fromMap(kafkaConfig);
        this.database = database;
        this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
        this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
        this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables);
        this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
        this.tableConfig = tableConfig;
    }

    public void build(StreamExecutionEnvironment env) throws Exception {
        boolean caseSensitive = this.catalog.caseSensitive();
        if (!caseSensitive) {
            this.validateCaseInsensitive();
        }
        this.catalog.createDatabase(this.database, true);
        TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, true, this.tablePrefix, this.tableSuffix);
        KafkaSource<String> source = KafkaActionUtils.buildKafkaSource(this.kafkaConfig);
        DataFormat format = DataFormat.getDataFormat(this.kafkaConfig);
        RecordParser recordParser = format.createParser(caseSensitive, tableNameConverter, Collections.emptyList());
        RichCdcMultiplexRecordSchemaBuilder schemaBuilder = new RichCdcMultiplexRecordSchemaBuilder(this.tableConfig);
        Pattern includingPattern = this.includingPattern;
        Pattern excludingPattern = this.excludingPattern;
        EventParser.Factory parserFactory = () -> new RichCdcMultiplexRecordEventParser(schemaBuilder, includingPattern, excludingPattern);
        FlinkCdcSyncDatabaseSinkBuilder sinkBuilder = new FlinkCdcSyncDatabaseSinkBuilder().withInput(env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").flatMap((FlatMapFunction)recordParser)).withParserFactory(parserFactory).withCatalogLoader(this.catalogLoader()).withDatabase(this.database).withMode(DatabaseSyncMode.COMBINED);
        String sinkParallelism = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (sinkParallelism != null) {
            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
        }
        sinkBuilder.build();
    }

    private void validateCaseInsensitive() {
        Preconditions.checkArgument(this.database.equals(this.database.toLowerCase()), String.format("Database name [%s] cannot contain upper case in case-insensitive catalog.", this.database));
        Preconditions.checkArgument(this.tablePrefix.equals(this.tablePrefix.toLowerCase()), String.format("Table prefix [%s] cannot contain upper case in case-insensitive catalog.", this.tablePrefix));
        Preconditions.checkArgument(this.tableSuffix.equals(this.tableSuffix.toLowerCase()), String.format("Table suffix [%s] cannot contain upper case in case-insensitive catalog.", this.tableSuffix));
    }

    @Override
    public void run() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.build(env);
        env.execute(String.format("KAFKA-Paimon Database Sync: %s", this.database));
    }
}

