/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunctionBase;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CdcActionCommonUtils {
    private static final Logger LOG = LoggerFactory.getLogger(CdcActionCommonUtils.class);
    public static final String KAFKA_CONF = "kafka_conf";
    public static final String MONGODB_CONF = "mongodb_conf";
    public static final String MYSQL_CONF = "mysql_conf";
    public static final String POSTGRES_CONF = "postgres_conf";
    public static final String PULSAR_CONF = "pulsar_conf";
    public static final String TABLE_PREFIX = "table_prefix";
    public static final String TABLE_SUFFIX = "table_suffix";
    public static final String TABLE_PREFIX_DB = "table_prefix_db";
    public static final String TABLE_SUFFIX_DB = "table_suffix_db";
    public static final String TABLE_MAPPING = "table_mapping";
    public static final String INCLUDING_TABLES = "including_tables";
    public static final String EXCLUDING_TABLES = "excluding_tables";
    public static final String INCLUDING_DBS = "including_dbs";
    public static final String EXCLUDING_DBS = "excluding_dbs";
    public static final String TYPE_MAPPING = "type_mapping";
    public static final String PARTITION_KEYS = "partition_keys";
    public static final String PRIMARY_KEYS = "primary_keys";
    public static final String COMPUTED_COLUMN = "computed_column";
    public static final String METADATA_COLUMN = "metadata_column";
    public static final String MULTIPLE_TABLE_PARTITION_KEYS = "multiple_table_partition_keys";
    public static final String EAGER_INIT = "eager_init";
    public static final String SYNC_PKEYS_FROM_SOURCE_SCHEMA = "sync_primary_keys_from_source_schema";

    public static void assertSchemaCompatible(TableSchema paimonSchema, List<DataField> sourceTableFields) {
        if (!CdcActionCommonUtils.schemaCompatible(paimonSchema, sourceTableFields)) {
            throw new IllegalArgumentException("Paimon schema and source table schema are not compatible.\nPaimon fields are: " + paimonSchema.fields() + ".\nSource table fields are: " + sourceTableFields);
        }
    }

    public static boolean schemaCompatible(TableSchema paimonSchema, List<DataField> sourceTableFields) {
        for (DataField field : sourceTableFields) {
            int idx = paimonSchema.fieldNames().indexOf(field.name());
            if (idx < 0) {
                LOG.info("Cannot find field '{}' in Paimon table.", (Object)field.name());
                return false;
            }
            DataType type = paimonSchema.fields().get(idx).type();
            if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type, TypeMapping.defaultMapping()) == UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT) continue;
            LOG.info("Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", new Object[]{field.name(), field.type(), type});
            return false;
        }
        return true;
    }

    public static List<String> listCaseConvert(List<String> origin, boolean caseSensitive) {
        return caseSensitive ? origin : origin.stream().map(String::toLowerCase).collect(Collectors.toList());
    }

    public static Schema buildPaimonSchema(String tableName, List<String> specifiedPartitionKeys, List<String> specifiedPrimaryKeys, List<ComputedColumn> computedColumns, Map<String, String> tableConfig, Schema sourceSchema, CdcMetadataConverter[] metadataConverters, boolean caseSensitive, boolean strictlyCheckSpecified, boolean requirePrimaryKeys, boolean syncPKeysFromSourceSchema) {
        Schema.Builder builder = Schema.newBuilder();
        builder.options(tableConfig);
        builder.options(sourceSchema.options());
        ArrayList<String> allFieldNames = new ArrayList<String>();
        for (DataField field : sourceSchema.fields()) {
            String fieldName = StringUtils.toLowerCaseIfNeed(field.name(), caseSensitive);
            allFieldNames.add(fieldName);
            builder.column(fieldName, field.type(), field.description());
        }
        for (ComputedColumn computedColumn : computedColumns) {
            String computedColumnName = StringUtils.toLowerCaseIfNeed(computedColumn.columnName(), caseSensitive);
            allFieldNames.add(computedColumnName);
            builder.column(computedColumnName, computedColumn.columnType());
        }
        for (Iterator<Serializable> iterator : metadataConverters) {
            String metadataColumnName = StringUtils.toLowerCaseIfNeed(iterator.columnName(), caseSensitive);
            allFieldNames.add(metadataColumnName);
            builder.column(metadataColumnName, iterator.dataType());
        }
        CdcActionCommonUtils.checkDuplicateFields(tableName, allFieldNames);
        specifiedPrimaryKeys = CdcActionCommonUtils.listCaseConvert(specifiedPrimaryKeys, caseSensitive);
        List<String> sourceSchemaPrimaryKeys = CdcActionCommonUtils.listCaseConvert(sourceSchema.primaryKeys(), caseSensitive);
        CdcActionCommonUtils.setPrimaryKeys(tableName, builder, specifiedPrimaryKeys, sourceSchemaPrimaryKeys, allFieldNames, strictlyCheckSpecified, requirePrimaryKeys, syncPKeysFromSourceSchema);
        specifiedPartitionKeys = CdcActionCommonUtils.listCaseConvert(specifiedPartitionKeys, caseSensitive);
        CdcActionCommonUtils.setPartitionKeys(tableName, builder, specifiedPartitionKeys, allFieldNames, strictlyCheckSpecified);
        builder.comment(sourceSchema.comment());
        return builder.build();
    }

    private static void setPrimaryKeys(String tableName, Schema.Builder builder, List<String> specifiedPrimaryKeys, List<String> sourceSchemaPrimaryKeys, List<String> allFieldNames, boolean strictlyCheckSpecified, boolean requirePrimaryKeys, boolean syncPKeysFromSourceSchema) {
        if (!specifiedPrimaryKeys.isEmpty()) {
            if (allFieldNames.containsAll(specifiedPrimaryKeys)) {
                builder.primaryKey(specifiedPrimaryKeys);
                return;
            }
            String message = String.format("For sink table %s, not all specified primary keys '%s' exist in source tables or computed columns '%s'.", tableName, specifiedPrimaryKeys, allFieldNames);
            if (strictlyCheckSpecified) {
                throw new IllegalArgumentException(message);
            }
            LOG.info("{} In this case at database-sync, we will set primary keys from source tables if exist, otherwise, primary keys are not set.", (Object)message);
        }
        if (syncPKeysFromSourceSchema && !sourceSchemaPrimaryKeys.isEmpty()) {
            builder.primaryKey(sourceSchemaPrimaryKeys);
            return;
        }
        if (requirePrimaryKeys && syncPKeysFromSourceSchema) {
            throw new IllegalArgumentException("Failed to set specified primary keys for sink table " + tableName + ". Also, can't infer primary keys from source table schemas because source tables have no primary keys or have different primary keys.");
        }
    }

    private static void setPartitionKeys(String tableName, Schema.Builder builder, List<String> specifiedPartitionKeys, List<String> allFieldNames, boolean strictlyCheckSpecified) {
        if (!specifiedPartitionKeys.isEmpty()) {
            if (allFieldNames.containsAll(specifiedPartitionKeys)) {
                builder.partitionKeys(specifiedPartitionKeys);
                return;
            }
            String message = String.format("For sink table %s, not all specified partition keys '%s' exist in source tables or computed columns '%s'.", tableName, specifiedPartitionKeys, allFieldNames);
            if (strictlyCheckSpecified) {
                throw new IllegalArgumentException(message);
            }
            LOG.info("{} In this case at database-sync, partition keys are not set.", (Object)message);
        }
    }

    public static void checkDuplicateFields(String tableName, List<String> fieldNames) {
        Set<String> duplicates = Schema.duplicateFields(fieldNames);
        Preconditions.checkState(duplicates.isEmpty(), "Table %s contains duplicate columns: %s.\nPossible causes are: 1. computed columns or metadata columns contain duplicate fields; 2. the catalog is case-insensitive and the table columns duplicate after they are all converted to lower-case.", tableName, duplicates);
    }

    public static String tableList(MultiTablesSinkMode mode, String databasePattern, String includingTablePattern, List<Identifier> monitoredTables, List<Identifier> excludedTables) {
        if (mode == MultiTablesSinkMode.DIVIDED) {
            return CdcActionCommonUtils.dividedModeTableList(monitoredTables);
        }
        if (mode == MultiTablesSinkMode.COMBINED) {
            return CdcActionCommonUtils.combinedModeTableList(databasePattern, includingTablePattern, excludedTables);
        }
        throw new UnsupportedOperationException("Unknown MultiTablesSinkMode: " + mode);
    }

    private static String dividedModeTableList(List<Identifier> monitoredTables) {
        return monitoredTables.stream().map(t -> t.getDatabaseName() + "\\." + t.getObjectName()).collect(Collectors.joining("|"));
    }

    public static String combinedModeTableList(String databasePattern, String includingTablePattern, List<Identifier> excludedTables) {
        String includingPattern = String.format("(%s)\\.(%s)", databasePattern, includingTablePattern);
        if (excludedTables.isEmpty()) {
            return includingPattern;
        }
        String excludingPattern = excludedTables.stream().map(t -> String.format("(^%s$)", t.getDatabaseName() + "\\." + t.getObjectName())).collect(Collectors.joining("|"));
        excludingPattern = "?!" + excludingPattern;
        return String.format("(%s)(%s)", excludingPattern, includingPattern);
    }

    public static void checkRequiredOptions(Configuration config, String confName, ConfigOption<?> ... configOptions) {
        for (ConfigOption<?> configOption : configOptions) {
            Preconditions.checkArgument(config.contains(configOption), "%s [%s] must be specified.", confName, configOption.key());
        }
    }

    public static void checkOneRequiredOption(Configuration config, String confName, ConfigOption<?> ... configOptions) {
        Preconditions.checkArgument(Arrays.stream(configOptions).filter(arg_0 -> ((Configuration)config).contains(arg_0)).count() == 1L, "%s must and can only set one of the following options: %s.", confName, Arrays.stream(configOptions).map(ConfigOption::key).collect(Collectors.joining(",")));
    }
}

