/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.utils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.table.types.utils.TypeConversions;

public class FlinkCatalogPropertiesUtil {
    private static final Pattern SCHEMA_COLUMN_NAME_SUFFIX = Pattern.compile("\\d+\\.name");

    public static Map<String, String> serializeNonPhysicalColumns(Map<String, Integer> indexMap, List<TableColumn> nonPhysicalColumns) {
        HashMap<String, String> serialized = new HashMap<String, String>();
        for (TableColumn c : nonPhysicalColumns) {
            int index = indexMap.get(c.getName());
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "name"), c.getName());
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "data-type"), c.getType().getLogicalType().asSerializableString());
            if (c instanceof TableColumn.ComputedColumn) {
                TableColumn.ComputedColumn computedColumn = (TableColumn.ComputedColumn)c;
                serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "expr"), computedColumn.getExpression());
                continue;
            }
            TableColumn.MetadataColumn metadataColumn = (TableColumn.MetadataColumn)c;
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "metadata"), metadataColumn.getMetadataAlias().orElse(metadataColumn.getName()));
            serialized.put(FlinkCatalogPropertiesUtil.compoundKey("schema", index, "virtual"), Boolean.toString(metadataColumn.isVirtual()));
        }
        return serialized;
    }

    public static Map<String, String> serializeWatermarkSpec(WatermarkSpec watermarkSpec) {
        HashMap<String, String> serializedWatermarkSpec = new HashMap<String, String>();
        String watermarkPrefix = FlinkCatalogPropertiesUtil.compoundKey("schema", "watermark", 0);
        serializedWatermarkSpec.put(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "rowtime"), watermarkSpec.getRowtimeAttribute());
        serializedWatermarkSpec.put(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.expr"), watermarkSpec.getWatermarkExpr());
        serializedWatermarkSpec.put(FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefix, "strategy.data-type"), watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString());
        return serializedWatermarkSpec;
    }

    public static int nonPhysicalColumnsCount(Map<String, String> tableOptions, List<String> physicalColumns) {
        int count = 0;
        for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
            if (!FlinkCatalogPropertiesUtil.isColumnNameKey(entry.getKey()) || physicalColumns.contains(entry.getValue())) continue;
            ++count;
        }
        return count;
    }

    private static boolean isColumnNameKey(String key) {
        return key.startsWith("schema") && SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring("schema".length() + 1)).matches();
    }

    public static TableColumn deserializeNonPhysicalColumn(Map<String, String> options, int index) {
        TableColumn.ComputedColumn column;
        String nameKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "name");
        String dataTypeKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "data-type");
        String exprKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "expr");
        String metadataKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "metadata");
        String virtualKey = FlinkCatalogPropertiesUtil.compoundKey("schema", index, "virtual");
        String name = options.get(nameKey);
        DataType dataType = TypeConversions.fromLogicalToDataType((LogicalType)LogicalTypeParser.parse((String)options.get(dataTypeKey)));
        if (options.containsKey(exprKey)) {
            column = TableColumn.computed((String)name, (DataType)dataType, (String)options.get(exprKey));
        } else if (options.containsKey(metadataKey)) {
            String metadataAlias = options.get(metadataKey);
            boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey));
            column = metadataAlias.equals(name) ? TableColumn.metadata((String)name, (DataType)dataType, (boolean)isVirtual) : TableColumn.metadata((String)name, (DataType)dataType, (String)metadataAlias, (boolean)isVirtual);
        } else {
            throw new RuntimeException(String.format("Failed to build non-physical column. Current index is %s, options are %s", index, options));
        }
        return column;
    }

    public static WatermarkSpec deserializeWatermarkSpec(Map<String, String> options) {
        String watermarkPrefixKey = FlinkCatalogPropertiesUtil.compoundKey("schema", "watermark");
        String rowtimeKey = FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefixKey, 0, "rowtime");
        String exprKey = FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefixKey, 0, "strategy.expr");
        String dataTypeKey = FlinkCatalogPropertiesUtil.compoundKey(watermarkPrefixKey, 0, "strategy.data-type");
        String rowtimeAttribute = options.get(rowtimeKey);
        String watermarkExpressionString = options.get(exprKey);
        DataType watermarkExprOutputType = TypeConversions.fromLogicalToDataType((LogicalType)LogicalTypeParser.parse((String)options.get(dataTypeKey)));
        return new WatermarkSpec(rowtimeAttribute, watermarkExpressionString, watermarkExprOutputType);
    }

    public static String compoundKey(Object ... components) {
        return Stream.of(components).map(Object::toString).collect(Collectors.joining("."));
    }
}

