/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql;

import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlRecordParser
implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordParser.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final ZoneId serverTimeZone;
    private final List<ComputedColumn> computedColumns;
    private final TypeMapping typeMapping;
    private final boolean isDebeziumSchemaCommentsEnabled;
    private DebeziumEvent root;
    private String currentTable;
    private String databaseName;
    private final CdcMetadataConverter[] metadataConverters;
    private final Set<String> nonPkTables = new HashSet<String>();

    public MySqlRecordParser(Configuration mySqlConfig, List<ComputedColumn> computedColumns, TypeMapping typeMapping, CdcMetadataConverter[] metadataConverters) {
        this.computedColumns = computedColumns;
        this.typeMapping = typeMapping;
        this.metadataConverters = metadataConverters;
        this.objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        String stringifyServerTimeZone = (String)mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
        ConfigOption includeSchemaCommentsConfig = ConfigOptions.key((String)("debezium." + RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name())).booleanType().defaultValue((Object)false);
        this.isDebeziumSchemaCommentsEnabled = (Boolean)mySqlConfig.get(includeSchemaCommentsConfig);
        this.serverTimeZone = stringifyServerTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(stringifyServerTimeZone);
    }

    public void flatMap(CdcSourceRecord rawEvent, Collector<RichCdcMultiplexRecord> out) throws Exception {
        this.root = this.objectMapper.readValue((String)rawEvent.getValue(), DebeziumEvent.class);
        this.currentTable = this.root.payload().source().get("table").asText();
        this.databaseName = this.root.payload().source().get("db").asText();
        if (this.root.payload().isSchemaChange()) {
            this.extractSchemaChange().forEach(arg_0 -> out.collect(arg_0));
            return;
        }
        this.extractRecords().forEach(arg_0 -> out.collect(arg_0));
    }

    private List<RichCdcMultiplexRecord> extractSchemaChange() {
        DebeziumEvent.Payload payload = this.root.payload();
        if (!payload.hasHistoryRecord()) {
            return Collections.emptyList();
        }
        TableChanges.TableChange tableChange = null;
        try {
            Iterator<TableChanges.TableChange> tableChanges = payload.getTableChanges();
            long count = 0L;
            while (tableChanges.hasNext()) {
                tableChange = tableChanges.next();
                ++count;
            }
            if (count != 1L) {
                LOG.error("Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" + payload.historyRecord());
                return Collections.emptyList();
            }
        }
        catch (Exception e) {
            LOG.error("Failed to parse history record for schema changes", (Throwable)e);
            return Collections.emptyList();
        }
        if (TableChanges.TableChangeType.CREATE == tableChange.getType() && tableChange.getTable().primaryKeyColumnNames().isEmpty()) {
            LOG.error("Didn't find primary keys from MySQL DDL for table '{}'. This table won't be synchronized.", (Object)this.currentTable);
            this.nonPkTables.add(this.currentTable);
            return Collections.emptyList();
        }
        Table table = tableChange.getTable();
        CdcSchema schema = this.extractSchema(table);
        return Collections.singletonList(new RichCdcMultiplexRecord(this.databaseName, this.currentTable, schema, CdcRecord.emptyRecord()));
    }

    private CdcSchema extractSchema(Table table) {
        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
        List columns = table.columns();
        for (Column column : columns) {
            DataType dataType = MySqlTypeUtils.toDataType(column.typeExpression(), column.length(), column.scale().orElse(null), this.typeMapping);
            dataType = dataType.copy(this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_NULLABLE) || column.isOptional());
            if (this.isDebeziumSchemaCommentsEnabled) {
                schemaBuilder.column(column.name(), dataType, column.comment());
                continue;
            }
            schemaBuilder.column(column.name(), dataType);
        }
        schemaBuilder.primaryKey(table.primaryKeyColumnNames());
        if (this.isDebeziumSchemaCommentsEnabled) {
            schemaBuilder.comment(table.comment());
        }
        return schemaBuilder.build();
    }

    private List<RichCdcMultiplexRecord> extractRecords() {
        Map<String, String> after;
        if (this.nonPkTables.contains(this.currentTable)) {
            return Collections.emptyList();
        }
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        Map<String, String> before = this.extractRow(this.root.payload().before());
        if (!before.isEmpty()) {
            records.add(this.createRecord(RowKind.DELETE, before));
        }
        if (!(after = this.extractRow(this.root.payload().after())).isEmpty()) {
            records.add(this.createRecord(RowKind.INSERT, after));
        }
        return records;
    }

    private Map<String, String> extractRow(JsonNode recordRow) {
        if (JsonSerdeUtil.isNull(recordRow)) {
            return new HashMap<String, String>();
        }
        DebeziumEvent.Field schema = Preconditions.checkNotNull(this.root.schema(), "MySqlRecordParser only supports debezium JSON with schema. Please make sure that `includeSchema` is true in the JsonDebeziumDeserializationSchema you created");
        Map<String, DebeziumEvent.Field> fields = schema.beforeAndAfterFields();
        LinkedHashMap<String, String> resultMap = new LinkedHashMap<String, String>();
        for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet()) {
            String fieldName = field.getKey();
            String string = field.getValue().type();
            JsonNode objectValue = recordRow.get(fieldName);
            if (JsonSerdeUtil.isNull(objectValue)) continue;
            String className = field.getValue().name();
            String oldValue = objectValue.asText();
            String newValue = DebeziumSchemaUtils.transformRawValue(oldValue, string, className, this.typeMapping, objectValue, this.serverTimeZone);
            resultMap.put(fieldName, newValue);
        }
        for (ComputedColumn computedColumn : this.computedColumns) {
            resultMap.put(computedColumn.columnName(), computedColumn.eval((String)resultMap.get(computedColumn.fieldReference())));
        }
        for (Iterator<Object> iterator2 : this.metadataConverters) {
            resultMap.put(iterator2.columnName(), iterator2.read(this.root.payload().source()));
        }
        return resultMap;
    }

    protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> data) {
        return new RichCdcMultiplexRecord(this.databaseName, this.currentTable, null, new CdcRecord(rowKind, data));
    }
}

