/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.utils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;

public class FlinkCatalogPropertiesUtil {
    private static final Pattern SCHEMA_COLUMN_NAME_SUFFIX = Pattern.compile("\\d+\\.name");
    private static final Pattern SCHEMA_COLUMN_METADATA_SUFFIX = Pattern.compile("\\d+\\.metadata");
    private static final Pattern SCHEMA_COLUMN_EXPR_SUFFIX = Pattern.compile("\\d+\\.expr");
    private static final Pattern SCHEMA_COLUMN_DATATYPE_SUFFIX = Pattern.compile("\\d+\\.data-type");
    private static final Pattern SCHEMA_COLUMN_VIRTUAL_SUFFIX = Pattern.compile("\\d+\\.virtual");
    private static final Set<Pattern> NON_PHYSICAL_KEY_PATTERNS = ImmutableSet.of(SCHEMA_COLUMN_NAME_SUFFIX, SCHEMA_COLUMN_METADATA_SUFFIX, SCHEMA_COLUMN_EXPR_SUFFIX, SCHEMA_COLUMN_DATATYPE_SUFFIX, SCHEMA_COLUMN_VIRTUAL_SUFFIX);

    public static Map<String, String> serializeNonPhysicalColumns(Map<String, Integer> indexMap, List<TableColumn> nonPhysicalColumns) {
        HashMap<String, String> serialized = new HashMap<String, String>();
        for (TableColumn c : nonPhysicalColumns) {
            int index = indexMap.get(c.getName());
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "name"), c.getName());
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "data-type"), c.getType().getLogicalType().asSerializableString());
            if (c instanceof TableColumn.ComputedColumn) {
                TableColumn.ComputedColumn computedColumn = (TableColumn.ComputedColumn)c;
                serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "expr"), computedColumn.getExpression());
                continue;
            }
            TableColumn.MetadataColumn metadataColumn = (TableColumn.MetadataColumn)c;
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "metadata"), metadataColumn.getMetadataAlias().orElse(metadataColumn.getName()));
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "virtual"), Boolean.toString(metadataColumn.isVirtual()));
        }
        return serialized;
    }

    public static Map<String, String> serializeNonPhysicalNewColumns(ResolvedSchema schema) {
        List nonPhysicalColumns = schema.getColumns().stream().filter(k -> !k.isPhysical()).collect(Collectors.toList());
        HashMap<String, String> serialized = new HashMap<String, String>();
        List columnNames = schema.getColumnNames();
        for (Column c : nonPhysicalColumns) {
            int index = columnNames.indexOf(c.getName());
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "name"), c.getName());
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "data-type"), c.getDataType().getLogicalType().asSerializableString());
            if (c instanceof Column.ComputedColumn) {
                Column.ComputedColumn computedColumn = (Column.ComputedColumn)c;
                serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "expr"), computedColumn.getExpression().asSerializableString());
                if (!computedColumn.getComment().isPresent()) continue;
                serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "comment"), (String)computedColumn.getComment().get());
                continue;
            }
            Column.MetadataColumn metadataColumn = (Column.MetadataColumn)c;
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "metadata"), metadataColumn.getMetadataKey().orElse(metadataColumn.getName()));
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "virtual"), Boolean.toString(metadataColumn.isVirtual()));
            if (!metadataColumn.getComment().isPresent()) continue;
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "comment"), (String)metadataColumn.getComment().get());
        }
        return serialized;
    }

    public static Map<String, String> serializeWatermarkSpec(org.apache.flink.table.api.WatermarkSpec watermarkSpec) {
        HashMap<String, String> serializedWatermarkSpec = new HashMap<String, String>();
        String watermarkPrefix = FlinkCatalogPropertiesUtil.compoundKey("schema", "watermark", 0);
        serializedWatermarkSpec.put(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "rowtime"), watermarkSpec.getRowtimeAttribute());
        serializedWatermarkSpec.put(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.expr"), watermarkSpec.getWatermarkExpr());
        serializedWatermarkSpec.put(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.data-type"), watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString());
        return serializedWatermarkSpec;
    }

    public static Map<String, String> serializeNewWatermarkSpec(WatermarkSpec watermarkSpec) {
        HashMap<String, String> serializedWatermarkSpec = new HashMap<String, String>();
        String watermarkPrefix = FlinkCatalogPropertiesUtil.compoundKey("schema", "watermark", 0);
        serializedWatermarkSpec.put(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "rowtime"), watermarkSpec.getRowtimeAttribute());
        serializedWatermarkSpec.put(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.expr"), watermarkSpec.getWatermarkExpression().asSerializableString());
        serializedWatermarkSpec.put(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.data-type"), watermarkSpec.getWatermarkExpression().getOutputDataType().getLogicalType().asSerializableString());
        return serializedWatermarkSpec;
    }

    public static int nonPhysicalColumnsCount(Map<String, String> tableOptions, List<String> physicalColumns) {
        int count = 0;
        for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
            if (!FlinkCatalogPropertiesUtil.isColumnNameKey(entry.getKey()) || physicalColumns.contains(entry.getValue())) continue;
            ++count;
        }
        return count;
    }

    public static boolean isNonPhysicalColumnKey(String key) {
        if (!key.startsWith("schema")) {
            return false;
        }
        String suffix = key.substring("schema".length() + 1);
        for (Pattern pattern : NON_PHYSICAL_KEY_PATTERNS) {
            if (!pattern.matcher(suffix).matches()) continue;
            return true;
        }
        return false;
    }

    public static Map<String, Integer> nonPhysicalColumns(Map<String, String> tableOptions, List<String> physicalColumns) {
        HashMap<String, Integer> nonPhysicalColumnIndex = new HashMap<String, Integer>();
        for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
            if (!FlinkCatalogPropertiesUtil.isColumnNameKey(entry.getKey()) || physicalColumns.contains(entry.getValue())) continue;
            String key = entry.getKey();
            int index = Integer.parseInt(key.substring("schema".length() + 1, key.indexOf(".", "schema".length() + 1)));
            nonPhysicalColumnIndex.put(entry.getValue(), index);
        }
        return nonPhysicalColumnIndex;
    }

    private static boolean isColumnNameKey(String key) {
        return key.startsWith("schema") && SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring("schema".length() + 1)).matches();
    }

    public static TableColumn deserializeNonPhysicalColumn(Map<String, String> options, int index) {
        TableColumn.ComputedColumn column;
        String nameKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "name");
        String dataTypeKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "data-type");
        String exprKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "expr");
        String metadataKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "metadata");
        String virtualKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "virtual");
        String name = options.get(nameKey);
        DataType dataType = TypeConversions.fromLogicalToDataType((LogicalType)LogicalTypeParser.parse((String)options.get(dataTypeKey)));
        if (options.containsKey(exprKey)) {
            column = TableColumn.computed((String)name, (DataType)dataType, (String)options.get(exprKey));
        } else if (options.containsKey(metadataKey)) {
            String metadataAlias = options.get(metadataKey);
            boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey));
            column = metadataAlias.equals(name) ? TableColumn.metadata((String)name, (DataType)dataType, (boolean)isVirtual) : TableColumn.metadata((String)name, (DataType)dataType, (String)metadataAlias, (boolean)isVirtual);
        } else {
            throw new RuntimeException(String.format("Failed to build non-physical column. Current index is %s, options are %s", index, options));
        }
        return column;
    }

    public static org.apache.flink.table.api.WatermarkSpec deserializeWatermarkSpec(Map<String, String> options) {
        String watermarkPrefixKey = FlinkCatalogPropertiesUtil.compoundKey("schema", "watermark");
        String rowtimeKey = FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefixKey, 0, "rowtime");
        String exprKey = FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefixKey, 0, "strategy.expr");
        String dataTypeKey = FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefixKey, 0, "strategy.data-type");
        String rowtimeAttribute = options.get(rowtimeKey);
        String watermarkExpressionString = options.get(exprKey);
        DataType watermarkExprOutputType = TypeConversions.fromLogicalToDataType((LogicalType)LogicalTypeParser.parse((String)options.get(dataTypeKey)));
        return new org.apache.flink.table.api.WatermarkSpec(rowtimeAttribute, watermarkExpressionString, watermarkExprOutputType);
    }

    public static String compoundKey(Object ... components) {
        return Stream.of(components).map(Object::toString).collect(Collectors.joining("."));
    }
}

