/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka.formats.canal;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalFieldParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Preconditions;

public class CanalRecordParser
extends RecordParser {
    private static final String FIELD_SQL = "sql";
    private static final String FIELD_MYSQL_TYPE = "mysqlType";
    private static final String FIELD_PRIMARY_KEYS = "pkNames";
    private static final String FIELD_TYPE = "type";
    private static final String FIELD_DATA = "data";
    private static final String FIELD_OLD = "old";
    private static final String OP_UPDATE = "UPDATE";
    private static final String OP_INSERT = "INSERT";
    private static final String OP_DELETE = "DELETE";
    private final List<ComputedColumn> computedColumns;

    public CanalRecordParser(boolean caseSensitive, TableNameConverter tableNameConverter, List<ComputedColumn> computedColumns) {
        super(tableNameConverter, caseSensitive);
        this.computedColumns = computedColumns;
    }

    @Override
    public List<RichCdcMultiplexRecord> extractRecords() {
        if (this.isDdl()) {
            return Collections.emptyList();
        }
        List<String> primaryKeys = this.extractPrimaryKeys();
        LinkedHashMap<String, String> mySqlFieldTypes = this.extractFieldTypesFromMySqlType();
        LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<String, DataType>();
        mySqlFieldTypes.forEach((name, type) -> paimonFieldTypes.put((String)name, MySqlTypeUtils.toDataType(type)));
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        String type2 = this.extractString(FIELD_TYPE);
        ArrayNode data = (ArrayNode)this.root.get(FIELD_DATA);
        switch (type2) {
            case "UPDATE": {
                ArrayNode old = this.root.get(FIELD_OLD) instanceof NullNode ? null : (ArrayNode)this.root.get(FIELD_OLD);
                for (int i = 0; i < data.size(); ++i) {
                    Map<String, String> after = this.extractRow(data.get(i), mySqlFieldTypes, paimonFieldTypes);
                    if (old != null) {
                        Map<String, String> before = this.extractRow(old.get(i), mySqlFieldTypes, paimonFieldTypes);
                        for (Map.Entry<String, String> entry : after.entrySet()) {
                            if (before.containsKey(entry.getKey())) continue;
                            before.put(entry.getKey(), entry.getValue());
                        }
                        before = this.caseSensitive ? before : this.keyCaseInsensitive(before);
                        records.add(new RichCdcMultiplexRecord(this.databaseName, this.tableName, paimonFieldTypes, primaryKeys, new CdcRecord(RowKind.DELETE, before)));
                    }
                    after = this.caseSensitive ? after : this.keyCaseInsensitive(after);
                    records.add(new RichCdcMultiplexRecord(this.databaseName, this.tableName, paimonFieldTypes, primaryKeys, new CdcRecord(RowKind.INSERT, after)));
                }
                break;
            }
            case "INSERT": 
            case "DELETE": {
                for (JsonNode datum : data) {
                    Map<String, String> after = this.extractRow(datum, mySqlFieldTypes, paimonFieldTypes);
                    after = this.caseSensitive ? after : this.keyCaseInsensitive(after);
                    RowKind kind = type2.equals(OP_INSERT) ? RowKind.INSERT : RowKind.DELETE;
                    records.add(new RichCdcMultiplexRecord(this.databaseName, this.tableName, paimonFieldTypes, primaryKeys, new CdcRecord(kind, after)));
                }
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown record type: " + type2);
            }
        }
        return records;
    }

    @Override
    public KafkaSchema getKafkaSchema(String record) {
        try {
            this.root = OBJECT_MAPPER.readValue(record, JsonNode.class);
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        this.validateFormat();
        if (this.isDdl()) {
            return null;
        }
        LinkedHashMap<String, String> mySqlFieldTypes = this.extractFieldTypesFromMySqlType();
        LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<String, DataType>();
        mySqlFieldTypes.forEach((name, type) -> paimonFieldTypes.put((String)name, MySqlTypeUtils.toDataType(type)));
        return new KafkaSchema(this.extractString("database"), this.extractString("table"), paimonFieldTypes, this.extractPrimaryKeys());
    }

    @Override
    protected void validateFormat() {
        String errorMessageTemplate = "Didn't find '%s' node in json. Only supports canal-json format,please make sure your topic's format is correct.";
        Preconditions.checkNotNull(this.root.get("database"), errorMessageTemplate, "database");
        Preconditions.checkNotNull(this.root.get("table"), errorMessageTemplate, "table");
        Preconditions.checkNotNull(this.root.get(FIELD_TYPE), errorMessageTemplate, FIELD_TYPE);
        Preconditions.checkNotNull(this.root.get(FIELD_DATA), errorMessageTemplate, FIELD_DATA);
        if (this.isDdl()) {
            Preconditions.checkNotNull(this.root.get(FIELD_SQL), errorMessageTemplate, FIELD_SQL);
        } else {
            Preconditions.checkNotNull(this.root.get(FIELD_MYSQL_TYPE), errorMessageTemplate, FIELD_MYSQL_TYPE);
            Preconditions.checkNotNull(this.root.get(FIELD_PRIMARY_KEYS), errorMessageTemplate, FIELD_PRIMARY_KEYS);
        }
    }

    @Override
    protected String extractString(String key) {
        return this.root.get(key).asText();
    }

    private boolean isDdl() {
        return this.root.get("isDdl") != null && this.root.get("isDdl").asBoolean();
    }

    private List<String> extractPrimaryKeys() {
        ArrayList<String> primaryKeys = new ArrayList<String>();
        ArrayNode pkNames = (ArrayNode)this.root.get(FIELD_PRIMARY_KEYS);
        pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(this.toFieldName(pk.asText())));
        return primaryKeys;
    }

    private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
        LinkedHashMap<String, String> fieldTypes = new LinkedHashMap<String, String>();
        JsonNode schema = this.root.get(FIELD_MYSQL_TYPE);
        Iterator<String> iterator = schema.fieldNames();
        while (iterator.hasNext()) {
            String fieldName = iterator.next();
            String fieldType = schema.get(fieldName).asText();
            fieldTypes.put(this.toFieldName(fieldName), fieldType);
        }
        return fieldTypes;
    }

    private Map<String, String> extractRow(JsonNode record, Map<String, String> mySqlFieldTypes, LinkedHashMap<String, DataType> paimonFieldTypes) {
        Map<String, Object> jsonMap = OBJECT_MAPPER.convertValue((Object)record, new TypeReference<Map<String, Object>>(){});
        if (jsonMap == null) {
            return new HashMap<String, String>();
        }
        HashMap<String, String> resultMap = new HashMap<String, String>();
        for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
            String oldValue;
            String fieldName = field.getKey();
            String mySqlType = field.getValue();
            Object objectValue = jsonMap.get(fieldName);
            if (objectValue == null) continue;
            String newValue = oldValue = objectValue.toString();
            if (MySqlTypeUtils.isSetType(MySqlTypeUtils.getShortType(mySqlType))) {
                newValue = CanalFieldParser.convertSet(newValue, mySqlType);
            } else if (MySqlTypeUtils.isEnumType(MySqlTypeUtils.getShortType(mySqlType))) {
                newValue = CanalFieldParser.convertEnum(newValue, mySqlType);
            } else if (MySqlTypeUtils.isGeoType(MySqlTypeUtils.getShortType(mySqlType))) {
                try {
                    byte[] wkb = CanalFieldParser.convertGeoType2WkbArray(oldValue.getBytes(StandardCharsets.ISO_8859_1));
                    newValue = MySqlTypeUtils.convertWkbArray(wkb);
                }
                catch (Exception e) {
                    throw new IllegalArgumentException(String.format("Failed to convert %s to geometry JSON.", oldValue), e);
                }
            }
            resultMap.put(fieldName, newValue);
        }
        for (ComputedColumn computedColumn : this.computedColumns) {
            resultMap.put(computedColumn.columnName(), computedColumn.eval((String)resultMap.get(computedColumn.fieldReference())));
            paimonFieldTypes.put(computedColumn.columnName(), computedColumn.columnType());
        }
        return resultMap;
    }
}

