/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.debezium;

import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;

public class DebeziumJsonRecordParser
extends AbstractJsonRecordParser {
    private boolean hasSchema;
    private final Map<String, String> debeziumTypes = new HashMap<String, String>();
    private final Map<String, String> classNames = new HashMap<String, String>();
    private final Map<String, Map<String, String>> parameters = new HashMap<String, Map<String, String>>();

    public DebeziumJsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
        super(typeMapping, computedColumns);
    }

    @Override
    public List<RichCdcMultiplexRecord> extractRecords() {
        String operation = this.getAndCheck("op").asText();
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        switch (operation) {
            case "c": 
            case "r": {
                this.processRecord(this.getData(), RowKind.INSERT, records);
                break;
            }
            case "u": {
                this.processRecord(this.mergeOldRecord(this.getData(), this.getBefore(operation)), RowKind.DELETE, records);
                this.processRecord(this.getData(), RowKind.INSERT, records);
                break;
            }
            case "d": {
                this.processRecord(this.getBefore(operation), RowKind.DELETE, records);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown record operation: " + operation);
            }
        }
        return records;
    }

    protected JsonNode getData() {
        return this.getAndCheck(this.dataField());
    }

    protected JsonNode getBefore(String op) {
        return this.getAndCheck("before", "op", op);
    }

    @Override
    protected void setRoot(CdcSourceRecord record) {
        JsonNode node = (JsonNode)record.getValue();
        this.hasSchema = false;
        if (node.has("schema")) {
            this.root = node.get("payload");
            JsonNode schema = node.get("schema");
            if (!JsonSerdeUtil.isNull(schema)) {
                this.parseSchema(schema);
                this.hasSchema = true;
            }
        } else {
            this.root = node;
        }
    }

    private void parseSchema(JsonNode schema) {
        this.debeziumTypes.clear();
        this.classNames.clear();
        this.parameters.clear();
        ArrayNode schemaFields = JsonSerdeUtil.getNodeAs(schema, "fields", ArrayNode.class);
        Preconditions.checkNotNull(schemaFields);
        JsonNode fields = null;
        for (int i = 0; i < schemaFields.size(); ++i) {
            JsonNode node = schemaFields.get(i);
            if ("after".equals(this.getString(node, "field"))) {
                fields = JsonSerdeUtil.getNodeAs(node, "fields", ArrayNode.class);
                break;
            }
            if (!"before".equals(this.getString(node, "field")) || fields != null) continue;
            fields = JsonSerdeUtil.getNodeAs(node, "fields", ArrayNode.class);
        }
        Preconditions.checkNotNull(fields);
        for (JsonNode node : fields) {
            String field = this.getString(node, "field");
            this.debeziumTypes.put(field, this.getString(node, "type"));
            this.classNames.put(field, this.getString(node, "name"));
            JsonNode parametersNode = node.get("parameters");
            Map parametersMap = JsonSerdeUtil.isNull(parametersNode) ? Collections.emptyMap() : (Map)JsonSerdeUtil.convertValue(parametersNode, new TypeReference<HashMap<String, String>>(){});
            this.parameters.put(field, parametersMap);
        }
    }

    @Nullable
    private String getString(JsonNode node, String fieldName) {
        JsonNode fieldValue = node.get(fieldName);
        return JsonSerdeUtil.isNull(fieldValue) ? null : fieldValue.asText();
    }

    @Override
    protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder schemaBuilder) {
        if (!this.hasSchema) {
            return super.extractRowData(record, schemaBuilder);
        }
        Map<String, Object> recordMap = JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>(){});
        LinkedHashMap<String, String> resultMap = new LinkedHashMap<String, String>();
        for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
            String fieldName = entry.getKey();
            String rawValue = Objects.toString(entry.getValue(), null);
            String debeziumType = this.debeziumTypes.get(fieldName);
            String className = this.classNames.get(fieldName);
            String transformed = DebeziumSchemaUtils.transformRawValue(rawValue, debeziumType, className, this.typeMapping, record.get(fieldName), ZoneOffset.UTC);
            resultMap.put(fieldName, transformed);
            schemaBuilder.column(fieldName, DebeziumSchemaUtils.toDataType(debeziumType, className, this.parameters.get(fieldName)));
        }
        this.evalComputedColumns(resultMap, schemaBuilder);
        return resultMap;
    }

    @Override
    protected String primaryField() {
        return "pkNames";
    }

    @Override
    protected String dataField() {
        return "after";
    }

    @Override
    @Nullable
    protected String getTableName() {
        return this.getFromSourceField("table");
    }

    @Override
    @Nullable
    protected String getDatabaseName() {
        return this.getFromSourceField("db");
    }

    @Override
    protected String format() {
        return "debezium-json";
    }

    @Nullable
    protected String getFromSourceField(String key) {
        JsonNode node = this.root.get("source");
        if (JsonSerdeUtil.isNull(node)) {
            return null;
        }
        return JsonSerdeUtil.isNull(node = node.get(key)) ? null : node.asText();
    }
}

