/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql.schema;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.function.Function;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSchemaUtils {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSchemaUtils.class);

    public static Schema buildSchema(DatabaseMetaData metaData, String databaseName, String tableName, String tableComment, TypeMapping typeMapping, boolean caseSensitive) throws SQLException {
        Schema.Builder builder = Schema.newBuilder();
        try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, null);){
            HashSet<String> existedFields = new HashSet<String>();
            Function<String, String> columnDuplicateErrMsg = CdcActionCommonUtils.columnDuplicateErrMsg(tableName);
            while (rs.next()) {
                String fieldName = rs.getString("COLUMN_NAME");
                String fieldType = rs.getString("TYPE_NAME");
                String fieldComment = rs.getString("REMARKS");
                Integer precision = rs.getInt("COLUMN_SIZE");
                if (rs.wasNull()) {
                    precision = null;
                }
                Integer scale = rs.getInt("DECIMAL_DIGITS");
                if (rs.wasNull()) {
                    scale = null;
                }
                boolean isNullable = typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_NULLABLE) || MySqlSchemaUtils.isNullableColumn(rs.getString("IS_NULLABLE"));
                DataType paimonType = MySqlTypeUtils.toDataType(fieldType, precision, scale, typeMapping).copy(isNullable);
                fieldName = CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck(fieldName, existedFields, caseSensitive, columnDuplicateErrMsg);
                builder.column(fieldName, paimonType, fieldComment);
            }
        }
        ArrayList<String> primaryKeys = new ArrayList<String>();
        try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, tableName);){
            while (rs.next()) {
                String fieldName = rs.getString("COLUMN_NAME");
                primaryKeys.add(StringUtils.caseSensitiveConversion(fieldName, caseSensitive));
            }
        }
        builder.primaryKey(primaryKeys);
        builder.comment(tableComment);
        return builder.build();
    }

    public static Schema mergeSchema(String currentTable, Schema current, String otherTable, Schema other) {
        LinkedHashMap<String, DataField> currentFields = new LinkedHashMap<String, DataField>();
        current.fields().forEach(field -> currentFields.put(field.name(), (DataField)field));
        for (DataField newField : other.fields()) {
            DataField dataField2 = (DataField)currentFields.get(newField.name());
            if (Objects.nonNull(dataField2)) {
                DataType oldType = dataField2.type();
                switch (UpdatedDataFieldsProcessFunction.canConvert(oldType, newField.type())) {
                    case CONVERT: {
                        currentFields.put(newField.name(), newField);
                        break;
                    }
                    case EXCEPTION: {
                        throw new IllegalArgumentException(String.format("Column %s have different types when merging schemas.\nCurrent table '%s' field: %s\nTo be merged table '%s' field: %s", newField.name(), currentTable, dataField2, otherTable, newField));
                    }
                }
                continue;
            }
            currentFields.put(newField.name(), newField);
        }
        Schema.Builder builder = Schema.newBuilder();
        if (current.primaryKeys().equals(other.primaryKeys())) {
            builder.primaryKey(current.primaryKeys());
        }
        builder.comment(current.comment());
        builder.options(current.options());
        builder.partitionKeys(current.partitionKeys());
        currentFields.forEach((name, dataField) -> builder.column(dataField.name(), dataField.type(), dataField.description()));
        return builder.build();
    }

    private static boolean isNullableColumn(String value) {
        if ("YES".equals(value)) {
            return true;
        }
        if ("NO".equals(value)) {
            return false;
        }
        LOG.error("Unrecognized nullable value: " + value);
        return true;
    }
}

