/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.aliyun;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
import org.apache.paimon.flink.action.cdc.format.aliyun.AliyunFieldParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AliyunRecordParser
extends AbstractJsonRecordParser {
    private static final Logger LOG = LoggerFactory.getLogger(AliyunRecordParser.class);
    private static final String FIELD_IS_DDL = "isDdl";
    private static final String FIELD_TYPE = "op";
    private static final String OP_UPDATE_BEFORE = "UPDATE_BEFORE";
    private static final String OP_UPDATE_AFTER = "UPDATE_AFTER";
    private static final String OP_INSERT = "INSERT";
    private static final String OP_DELETE = "DELETE";
    private static final String FIELD_PAYLOAD = "payload";
    private static final String FIELD_BEFORE = "before";
    private static final String FIELD_AFTER = "after";
    private static final String FIELD_COLUMN = "dataColumn";
    private static final String FIELD_SCHEMA = "schema";
    private static final String FIELD_PK = "primaryKey";

    @Override
    protected boolean isDDL() {
        JsonNode node = this.root.get(FIELD_IS_DDL);
        return !JsonSerdeUtil.isNull(node) && node.asBoolean();
    }

    public AliyunRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
        super(typeMapping, computedColumns);
    }

    @Override
    protected String primaryField() {
        return "schema.primaryKey";
    }

    @Override
    protected String dataField() {
        return "payload.dataColumn";
    }

    @Override
    protected List<String> extractPrimaryKeys() {
        JsonNode schemaNode = this.root.get(FIELD_SCHEMA);
        this.checkNotNull(schemaNode, FIELD_SCHEMA);
        ArrayNode pkNode = JsonSerdeUtil.getNodeAs(schemaNode, FIELD_PK, ArrayNode.class);
        ArrayList<String> pkFields = new ArrayList<String>();
        pkNode.forEach(pk -> {
            if (JsonSerdeUtil.isNull(pk)) {
                throw new IllegalArgumentException(String.format("Primary key cannot be null: %s", pk));
            }
            pkFields.add(pk.asText());
        });
        return pkFields;
    }

    @Override
    public List<RichCdcMultiplexRecord> extractRecords() {
        if (this.isDDL()) {
            return Collections.emptyList();
        }
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        JsonNode payload = this.root.get(FIELD_PAYLOAD);
        this.checkNotNull(payload, FIELD_PAYLOAD);
        String type = payload.get(FIELD_TYPE).asText();
        RowKind rowKind = null;
        String field = null;
        switch (type) {
            case "UPDATE_BEFORE": {
                rowKind = RowKind.UPDATE_BEFORE;
                field = FIELD_BEFORE;
                break;
            }
            case "UPDATE_AFTER": {
                rowKind = RowKind.UPDATE_AFTER;
                field = FIELD_AFTER;
                break;
            }
            case "INSERT": {
                rowKind = RowKind.INSERT;
                field = FIELD_AFTER;
                break;
            }
            case "DELETE": {
                rowKind = RowKind.DELETE;
                field = FIELD_BEFORE;
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown record operation: " + type);
            }
        }
        JsonNode container = payload.get(field);
        this.checkNotNull(container, String.format("%s.%s", FIELD_PAYLOAD, field));
        JsonNode data = JsonSerdeUtil.getNodeAs(container, FIELD_COLUMN, JsonNode.class);
        this.checkNotNull(data, String.format("%s.%s.%s", FIELD_PAYLOAD, field, FIELD_COLUMN));
        this.processRecord(data, rowKind, records);
        return records;
    }

    @Override
    protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
        Map<String, Object> recordMap = JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>(){});
        HashMap<String, String> rowData = new HashMap<String, String>();
        this.fillDefaultTypes(record, rowTypeBuilder);
        for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
            rowData.put(entry.getKey(), Objects.toString(entry.getValue(), null));
        }
        this.evalComputedColumns(rowData, rowTypeBuilder);
        return rowData;
    }

    @Override
    protected String format() {
        return "aliyun-json";
    }

    @Override
    @Nullable
    protected String getTableName() {
        JsonNode schemaNode = this.root.get(FIELD_SCHEMA);
        if (JsonSerdeUtil.isNull(schemaNode)) {
            return null;
        }
        JsonNode sourceNode = schemaNode.get("source");
        if (JsonSerdeUtil.isNull(sourceNode)) {
            return null;
        }
        JsonNode tableNode = sourceNode.get("tableName");
        if (JsonSerdeUtil.isNull(tableNode)) {
            return null;
        }
        return tableNode.asText();
    }

    @Override
    @Nullable
    protected String getDatabaseName() {
        JsonNode schemaNode = this.root.get(FIELD_SCHEMA);
        if (JsonSerdeUtil.isNull(schemaNode)) {
            return null;
        }
        JsonNode sourceNode = schemaNode.get("source");
        if (JsonSerdeUtil.isNull(sourceNode)) {
            return null;
        }
        JsonNode databaseNode = sourceNode.get("dbName");
        if (JsonSerdeUtil.isNull(databaseNode)) {
            return null;
        }
        return databaseNode.asText();
    }

    private Map<JsonNode, JsonNode> matchOldRecords(ArrayNode newData, ArrayNode oldData) {
        return IntStream.range(0, newData.size()).boxed().collect(Collectors.toMap(newData::get, oldData::get));
    }

    private String transformValue(@Nullable String oldValue, String shortType, String mySqlType) {
        if (oldValue == null) {
            return null;
        }
        if (MySqlTypeUtils.isSetType(shortType)) {
            return AliyunFieldParser.convertSet(oldValue, mySqlType);
        }
        if (MySqlTypeUtils.isEnumType(shortType)) {
            return AliyunFieldParser.convertEnum(oldValue, mySqlType);
        }
        if (MySqlTypeUtils.isGeoType(shortType)) {
            try {
                byte[] wkb = AliyunFieldParser.convertGeoType2WkbArray(oldValue.getBytes(StandardCharsets.ISO_8859_1));
                return MySqlTypeUtils.convertWkbArray(wkb);
            }
            catch (Exception e) {
                throw new IllegalArgumentException(String.format("Failed to convert %s to geometry JSON.", oldValue), e);
            }
        }
        return oldValue;
    }
}

