/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.debezium;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonRecordParser;
import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.TypeUtils;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebeziumBsonRecordParser
extends DebeziumJsonRecordParser {
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumBsonRecordParser.class);
    private static final String FIELD_COLLECTION = "collection";
    private static final String FIELD_OBJECT_ID = "_id";
    private static final String FIELD_KEY_ID = "id";
    private static final List<String> PRIMARY_KEYS = Collections.singletonList("_id");
    private ObjectNode keyRoot;

    public DebeziumBsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
        super(typeMapping, computedColumns);
    }

    @Override
    public List<RichCdcMultiplexRecord> extractRecords() {
        String operation = this.getAndCheck("op").asText();
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        switch (operation) {
            case "c": 
            case "r": {
                this.processRecord(this.getData(), RowKind.INSERT, records);
                break;
            }
            case "u": {
                this.processDeleteRecord(operation, records);
                this.processRecord(this.getData(), RowKind.INSERT, records);
                break;
            }
            case "d": {
                this.processDeleteRecord(operation, records);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown record operation: " + operation);
            }
        }
        return records;
    }

    @Override
    protected void setRoot(CdcSourceRecord record) {
        this.root = (JsonNode)record.getValue();
        if (this.root.has("schema")) {
            this.root = this.root.get("payload");
        }
        this.keyRoot = (ObjectNode)record.getKey();
        if (!JsonSerdeUtil.isNull((JsonNode)this.keyRoot) && this.keyRoot.has("schema")) {
            this.keyRoot = (ObjectNode)this.keyRoot.get("payload");
        }
    }

    @Override
    protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder schemaBuilder) {
        Preconditions.checkArgument((boolean)record.isTextual(), (String)"debezium bson record expected to be STRING type, but actual is %s", (Object[])new Object[]{record.getNodeType()});
        BsonDocument document = BsonDocument.parse((String)record.asText());
        LinkedHashMap<String, String> resultMap = new LinkedHashMap<String, String>();
        for (Map.Entry entry : document.entrySet()) {
            String fieldName = (String)entry.getKey();
            resultMap.put(fieldName, DebeziumBsonRecordParser.toJsonString(BsonValueConvertor.convert((BsonValue)entry.getValue())));
            schemaBuilder.column(fieldName, (DataType)DataTypes.STRING());
        }
        this.evalComputedColumns(resultMap, schemaBuilder);
        return resultMap;
    }

    private static String toJsonString(Object entry) {
        if (entry == null) {
            return null;
        }
        if (!TypeUtils.isBasicType((Object)entry)) {
            try {
                return JsonSerdeUtil.writeValueAsString((Object)entry);
            }
            catch (JsonProcessingException e) {
                LOG.error("Failed to deserialize record.", (Throwable)e);
            }
        }
        return Objects.toString(entry);
    }

    @Override
    protected List<String> extractPrimaryKeys() {
        return PRIMARY_KEYS;
    }

    @Override
    @Nullable
    protected String getTableName() {
        return this.getFromSourceField(FIELD_COLLECTION);
    }

    @Override
    protected String format() {
        return "debezium-bson";
    }

    public boolean checkBeforeExists() {
        return !JsonSerdeUtil.isNull((JsonNode)this.root.get("before"));
    }

    private void processDeleteRecord(String operation, List<RichCdcMultiplexRecord> records) {
        if (this.checkBeforeExists()) {
            this.processRecord(this.getBefore(operation), RowKind.DELETE, records);
        } else {
            JsonNode idNode = null;
            Preconditions.checkArgument((!JsonSerdeUtil.isNull((JsonNode)this.keyRoot) && !JsonSerdeUtil.isNull((JsonNode)(idNode = this.keyRoot.get(FIELD_KEY_ID))) ? 1 : 0) != 0, (String)"Invalid %s format: missing '%s' field in key when '%s' is '%s' for: %s.", (Object[])new Object[]{this.format(), FIELD_KEY_ID, "op", operation, this.keyRoot});
            Map<String, Object> record = Collections.singletonMap(FIELD_OBJECT_ID, JsonSerdeUtil.fromJson((String)idNode.asText(), JsonNode.class));
            try {
                this.processRecord((JsonNode)new TextNode(JsonSerdeUtil.writeValueAsString(record)), RowKind.DELETE, records);
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException("Failed to deserialize key record.", e);
            }
        }
    }
}

