/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RecordParser
implements FlatMapFunction<String, RichCdcMultiplexRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(RecordParser.class);
    protected static final String FIELD_TABLE = "table";
    protected static final String FIELD_DATABASE = "database";
    private final boolean caseSensitive;
    protected final TypeMapping typeMapping;
    protected final List<ComputedColumn> computedColumns;
    protected JsonNode root;

    public RecordParser(boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
        this.caseSensitive = caseSensitive;
        this.typeMapping = typeMapping;
        this.computedColumns = computedColumns;
    }

    @Nullable
    public Schema buildSchema(String record) {
        try {
            this.setRoot(record);
            if (this.isDDL()) {
                return null;
            }
            Optional recordOpt = this.extractRecords().stream().findFirst();
            if (!recordOpt.isPresent()) {
                return null;
            }
            Schema.Builder builder = Schema.newBuilder();
            ((RichCdcMultiplexRecord)recordOpt.get()).fieldTypes().forEach(builder::column);
            builder.primaryKey(this.extractPrimaryKeys());
            return builder.build();
        }
        catch (Exception e) {
            this.logInvalidJsonString(record);
            throw e;
        }
    }

    protected abstract List<RichCdcMultiplexRecord> extractRecords();

    protected abstract String primaryField();

    protected abstract String dataField();

    protected boolean isDDL() {
        return false;
    }

    protected LinkedHashMap<String, DataType> fillDefaultTypes(JsonNode record) {
        LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<String, DataType>();
        record.fieldNames().forEachRemaining(name -> {
            DataType cfr_ignored_0 = fieldTypes.put((String)name, DataTypes.STRING());
        });
        return fieldTypes;
    }

    public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) {
        try {
            this.setRoot(value);
            this.extractRecords().forEach(arg_0 -> out.collect(arg_0));
        }
        catch (Exception e) {
            this.logInvalidJsonString(value);
            throw e;
        }
    }

    protected Map<String, String> extractRowData(JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) {
        paimonFieldTypes.putAll(this.fillDefaultTypes(record));
        Map<String, Object> recordMap = JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>(){});
        Map<String, String> rowData = recordMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            if (Objects.nonNull(entry.getValue()) && !TypeUtils.isBasicType(entry.getValue())) {
                try {
                    return JsonSerdeUtil.writeValueAsString(entry.getValue());
                }
                catch (JsonProcessingException e) {
                    LOG.error("Failed to deserialize record.", (Throwable)e);
                    return Objects.toString(entry.getValue());
                }
            }
            return Objects.toString(entry.getValue(), null);
        }));
        this.evalComputedColumns(rowData, paimonFieldTypes);
        return rowData;
    }

    protected void evalComputedColumns(Map<String, String> rowData, LinkedHashMap<String, DataType> paimonFieldTypes) {
        this.computedColumns.forEach(computedColumn -> {
            rowData.put(computedColumn.columnName(), computedColumn.eval((String)rowData.get(computedColumn.fieldReference())));
            paimonFieldTypes.put(computedColumn.columnName(), computedColumn.columnType());
        });
    }

    private List<String> extractPrimaryKeys() {
        ArrayNode pkNames = JsonSerdeUtil.getNodeAs(this.root, this.primaryField(), ArrayNode.class);
        if (pkNames == null) {
            return Collections.emptyList();
        }
        return StreamSupport.stream(pkNames.spliterator(), false).map(JsonNode::asText).collect(Collectors.toList());
    }

    protected void processRecord(JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord> records) {
        LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<String, DataType>(jsonNode.size());
        Map<String, String> rowData = this.extractRowData(jsonNode, paimonFieldTypes);
        records.add(this.createRecord(rowKind, rowData, paimonFieldTypes));
    }

    private RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> data, LinkedHashMap<String, DataType> paimonFieldTypes) {
        String databaseName = this.getDatabaseName();
        String tableName = this.getTableName();
        paimonFieldTypes = CdcActionCommonUtils.mapKeyCaseConvert(paimonFieldTypes, this.caseSensitive, CdcActionCommonUtils.columnDuplicateErrMsg(tableName == null ? "UNKNOWN" : tableName));
        data = CdcActionCommonUtils.mapKeyCaseConvert(data, this.caseSensitive, CdcActionCommonUtils.recordKeyDuplicateErrMsg(data));
        List<String> primaryKeys = CdcActionCommonUtils.listCaseConvert(this.extractPrimaryKeys(), this.caseSensitive);
        return new RichCdcMultiplexRecord(databaseName, tableName, paimonFieldTypes, primaryKeys, new CdcRecord(rowKind, data));
    }

    protected void setRoot(String record) {
        this.root = JsonSerdeUtil.fromJson(record, JsonNode.class);
    }

    protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
        Object oldFullRecordNode = data.deepCopy();
        oldNode.fieldNames().forEachRemaining(fieldName -> ((ObjectNode)oldFullRecordNode).set((String)fieldName, oldNode.get((String)fieldName)));
        return oldFullRecordNode;
    }

    @Nullable
    protected String getTableName() {
        JsonNode node = this.root.get(FIELD_TABLE);
        return JsonSerdeUtil.isNull(node) ? null : node.asText();
    }

    @Nullable
    protected String getDatabaseName() {
        JsonNode node = this.root.get(FIELD_DATABASE);
        return JsonSerdeUtil.isNull(node) ? null : node.asText();
    }

    private void logInvalidJsonString(String json) {
        LOG.info("Invalid Json:\n{}", (Object)json);
    }

    protected void checkNotNull(JsonNode node, String key) {
        if (JsonSerdeUtil.isNull(node)) {
            throw new RuntimeException(String.format("Invalid %s format: missing '%s' field.", this.format(), key));
        }
    }

    protected void checkNotNull(JsonNode node, String key, String conditionKey, String conditionValue) {
        if (JsonSerdeUtil.isNull(node)) {
            throw new RuntimeException(String.format("Invalid %s format: missing '%s' field when '%s' is '%s'.", this.format(), key, conditionKey, conditionValue));
        }
    }

    protected JsonNode getAndCheck(String key) {
        JsonNode node = this.root.get(key);
        this.checkNotNull(node, key);
        return node;
    }

    protected JsonNode getAndCheck(String key, String conditionKey, String conditionValue) {
        JsonNode node = this.root.get(key);
        this.checkNotNull(node, key, conditionKey, conditionValue);
        return node;
    }

    protected abstract String format();
}

