/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractRecordParser;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJsonRecordParser
extends AbstractRecordParser {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJsonRecordParser.class);
    protected JsonNode root;

    public AbstractJsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
        super(typeMapping, computedColumns);
    }

    @Override
    protected void setRoot(CdcSourceRecord record) {
        this.root = (JsonNode)record.getValue();
    }

    protected abstract String primaryField();

    protected abstract String dataField();

    protected void fillDefaultTypes(JsonNode record, CdcSchema.Builder schemaBuilder) {
        record.fieldNames().forEachRemaining(name -> schemaBuilder.column((String)name, (DataType)DataTypes.STRING()));
    }

    protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder schemaBuilder) {
        this.fillDefaultTypes(record, schemaBuilder);
        Map recordMap = (Map)JsonSerdeUtil.convertValue((Object)record, (TypeReference)new TypeReference<Map<String, Object>>(){});
        Map<String, String> rowData = recordMap.entrySet().stream().filter(entry -> Objects.nonNull(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            if (Objects.nonNull(entry.getValue()) && !TypeUtils.isBasicType(entry.getValue())) {
                try {
                    return JsonSerdeUtil.writeValueAsString(entry.getValue());
                }
                catch (JsonProcessingException e) {
                    LOG.error("Failed to deserialize record.", (Throwable)e);
                    return Objects.toString(entry.getValue());
                }
            }
            return Objects.toString(entry.getValue());
        }));
        this.evalComputedColumns(rowData, schemaBuilder);
        return rowData;
    }

    @Override
    protected List<String> extractPrimaryKeys() {
        ArrayNode pkNames = (ArrayNode)JsonSerdeUtil.getNodeAs((JsonNode)this.root, (String)this.primaryField(), ArrayNode.class);
        if (pkNames == null) {
            return Collections.emptyList();
        }
        return StreamSupport.stream(pkNames.spliterator(), false).map(JsonNode::asText).collect(Collectors.toList());
    }

    protected void processRecord(JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord> records) {
        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
        Map<String, String> rowData = this.extractRowData(jsonNode, schemaBuilder);
        records.add(this.createRecord(rowKind, rowData, schemaBuilder));
    }

    protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
        JsonNode oldFullRecordNode = data.deepCopy();
        oldNode.fieldNames().forEachRemaining(fieldName -> ((ObjectNode)oldFullRecordNode).set(fieldName, oldNode.get(fieldName)));
        return oldFullRecordNode;
    }

    @Override
    @Nullable
    protected String getTableName() {
        JsonNode node = this.root.get("table");
        return JsonSerdeUtil.isNull((JsonNode)node) ? null : node.asText();
    }

    @Override
    @Nullable
    protected String getDatabaseName() {
        JsonNode node = this.root.get("database");
        return JsonSerdeUtil.isNull((JsonNode)node) ? null : node.asText();
    }

    protected void checkNotNull(JsonNode node, String key) {
        if (JsonSerdeUtil.isNull((JsonNode)node)) {
            throw new RuntimeException(String.format("Invalid %s format: missing '%s' field.", this.format(), key));
        }
    }

    protected void checkNotNull(JsonNode node, String key, String conditionKey, String conditionValue) {
        if (JsonSerdeUtil.isNull((JsonNode)node)) {
            throw new RuntimeException(String.format("Invalid %s format: missing '%s' field when '%s' is '%s'.", this.format(), key, conditionKey, conditionValue));
        }
    }

    protected JsonNode getAndCheck(String key) {
        JsonNode node = this.root.get(key);
        this.checkNotNull(node, key);
        return node;
    }

    protected JsonNode getAndCheck(String key, String conditionKey, String conditionValue) {
        JsonNode node = this.root.get(key);
        this.checkNotNull(node, key, conditionKey, conditionValue);
        return node;
    }
}

