/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.debezium;

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;

public class DebeziumRecordParser
extends RecordParser {
    private static final String FIELD_SCHEMA = "schema";
    private static final String FIELD_PAYLOAD = "payload";
    private static final String FIELD_BEFORE = "before";
    private static final String FIELD_AFTER = "after";
    private static final String FIELD_SOURCE = "source";
    private static final String FIELD_PRIMARY = "pkNames";
    private static final String FIELD_DB = "db";
    private static final String FIELD_TYPE = "op";
    private static final String OP_INSERT = "c";
    private static final String OP_UPDATE = "u";
    private static final String OP_DELETE = "d";
    private static final String OP_READE = "r";

    public DebeziumRecordParser(boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
        super(caseSensitive, typeMapping, computedColumns);
    }

    @Override
    public List<RichCdcMultiplexRecord> extractRecords() {
        String operation = this.getAndCheck(FIELD_TYPE).asText();
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        switch (operation) {
            case "c": 
            case "r": {
                this.processRecord(this.getData(), RowKind.INSERT, records);
                break;
            }
            case "u": {
                this.processRecord(this.mergeOldRecord(this.getData(), this.getBefore(operation)), RowKind.DELETE, records);
                this.processRecord(this.getData(), RowKind.INSERT, records);
                break;
            }
            case "d": {
                this.processRecord(this.getBefore(operation), RowKind.DELETE, records);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown record operation: " + operation);
            }
        }
        return records;
    }

    private JsonNode getData() {
        return this.getAndCheck(this.dataField());
    }

    private JsonNode getBefore(String op) {
        return this.getAndCheck(FIELD_BEFORE, FIELD_TYPE, op);
    }

    @Override
    protected void setRoot(String record) {
        JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class);
        this.root = node.has(FIELD_SCHEMA) ? node.get(FIELD_PAYLOAD) : node;
    }

    @Override
    protected String primaryField() {
        return FIELD_PRIMARY;
    }

    @Override
    protected String dataField() {
        return FIELD_AFTER;
    }

    @Override
    @Nullable
    protected String getTableName() {
        return this.getFromSourceField("table");
    }

    @Override
    @Nullable
    protected String getDatabaseName() {
        return this.getFromSourceField(FIELD_DB);
    }

    @Override
    protected String format() {
        return "debezium-json";
    }

    @Nullable
    private String getFromSourceField(String key) {
        JsonNode node = this.root.get(FIELD_SOURCE);
        if (JsonSerdeUtil.isNull(node)) {
            return null;
        }
        return JsonSerdeUtil.isNull(node = node.get(key)) ? null : node.asText();
    }
}

