/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mongodb.strategy;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.mongodb.strategy.MongoVersionStrategy;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;

public class Mongo4VersionStrategy
implements MongoVersionStrategy {
    private static final String FIELD_TYPE = "operationType";
    private static final String FIELD_DATA = "fullDocument";
    private static final String FIELD_KEY = "documentKey";
    private static final String OP_UPDATE = "update";
    private static final String OP_INSERT = "insert";
    private static final String OP_REPLACE = "replace";
    private static final String OP_DELETE = "delete";
    private final String databaseName;
    private final String collection;
    private final boolean caseSensitive;
    private final Configuration mongodbConfig;
    private final List<ComputedColumn> computedColumns;

    public Mongo4VersionStrategy(String databaseName, String collection, boolean caseSensitive, List<ComputedColumn> computedColumns, Configuration mongodbConfig) {
        this.databaseName = databaseName;
        this.collection = collection;
        this.caseSensitive = caseSensitive;
        this.mongodbConfig = mongodbConfig;
        this.computedColumns = computedColumns;
    }

    @Override
    public List<RichCdcMultiplexRecord> extractRecords(JsonNode root) throws JsonProcessingException {
        String op = root.get(FIELD_TYPE).asText();
        JsonNode fullDocument = root.get(FIELD_DATA);
        JsonNode documentKey = root.get(FIELD_KEY);
        return this.handleOperation(op, fullDocument, documentKey);
    }

    private List<RichCdcMultiplexRecord> handleOperation(String op, JsonNode fullDocument, JsonNode documentKey) throws JsonProcessingException {
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        switch (op) {
            case "insert": {
                records.add(this.processRecord(fullDocument, RowKind.INSERT));
                break;
            }
            case "replace": 
            case "update": {
                records.add(this.processRecord(documentKey, RowKind.DELETE));
                records.add(this.processRecord(fullDocument, RowKind.INSERT));
                break;
            }
            case "delete": {
                records.add(this.processRecord(documentKey, RowKind.DELETE));
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown record type: " + op);
            }
        }
        return records;
    }

    private RichCdcMultiplexRecord processRecord(JsonNode fullDocument, RowKind rowKind) throws JsonProcessingException {
        LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<String, DataType>();
        Map<String, String> record = this.getExtractRow(fullDocument, paimonFieldTypes, this.computedColumns, this.mongodbConfig);
        record = CdcActionCommonUtils.mapKeyCaseConvert(record, this.caseSensitive, CdcActionCommonUtils.recordKeyDuplicateErrMsg(record));
        paimonFieldTypes = CdcActionCommonUtils.mapKeyCaseConvert(paimonFieldTypes, this.caseSensitive, CdcActionCommonUtils.columnDuplicateErrMsg(this.collection));
        return new RichCdcMultiplexRecord(this.databaseName, this.collection, paimonFieldTypes, this.extractPrimaryKeys(), new CdcRecord(rowKind, record));
    }
}

