/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlRecordParser
implements FlatMapFunction<String, RichCdcMultiplexRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordParser.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final ZoneId serverTimeZone;
    private final boolean caseSensitive;
    private final List<ComputedColumn> computedColumns;
    private final TypeMapping typeMapping;
    private DebeziumEvent root;
    private String currentTable;
    private String databaseName;
    private final CdcMetadataConverter[] metadataConverters;
    private final Set<String> nonPkTables = new HashSet<String>();

    public MySqlRecordParser(Configuration mySqlConfig, boolean caseSensitive, List<ComputedColumn> computedColumns, TypeMapping typeMapping, CdcMetadataConverter[] metadataConverters) {
        this.caseSensitive = caseSensitive;
        this.computedColumns = computedColumns;
        this.typeMapping = typeMapping;
        this.metadataConverters = metadataConverters;
        this.objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        String stringifyServerTimeZone = (String)mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
        this.serverTimeZone = stringifyServerTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(stringifyServerTimeZone);
    }

    public void flatMap(String rawEvent, Collector<RichCdcMultiplexRecord> out) throws Exception {
        this.root = this.objectMapper.readValue(rawEvent, DebeziumEvent.class);
        this.currentTable = this.root.payload().source().get("table").asText();
        this.databaseName = this.root.payload().source().get("db").asText();
        if (this.root.payload().isSchemaChange()) {
            this.extractSchemaChange().forEach(arg_0 -> out.collect(arg_0));
            return;
        }
        this.extractRecords().forEach(arg_0 -> out.collect(arg_0));
    }

    private List<RichCdcMultiplexRecord> extractSchemaChange() {
        DebeziumEvent.Payload payload = this.root.payload();
        if (!payload.hasHistoryRecord()) {
            return Collections.emptyList();
        }
        TableChanges.TableChange tableChange = null;
        try {
            Iterator<TableChanges.TableChange> tableChanges = payload.getTableChanges();
            long count = 0L;
            while (tableChanges.hasNext()) {
                tableChange = tableChanges.next();
                ++count;
            }
            if (count != 1L) {
                LOG.error("Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" + payload.historyRecord());
                return Collections.emptyList();
            }
        }
        catch (Exception e) {
            LOG.error("Failed to parse history record for schema changes", (Throwable)e);
            return Collections.emptyList();
        }
        if (TableChanges.TableChangeType.CREATE == tableChange.getType() && tableChange.getTable().primaryKeyColumnNames().isEmpty()) {
            LOG.error("Didn't find primary keys from MySQL DDL for table '{}'. This table won't be synchronized.", (Object)this.currentTable);
            this.nonPkTables.add(this.currentTable);
            return Collections.emptyList();
        }
        Table table = tableChange.getTable();
        LinkedHashMap<String, DataType> fieldTypes = this.extractFieldTypes(table);
        List<String> primaryKeys = CdcActionCommonUtils.listCaseConvert(table.primaryKeyColumnNames(), this.caseSensitive);
        return Collections.singletonList(new RichCdcMultiplexRecord(this.databaseName, this.currentTable, fieldTypes, primaryKeys, CdcRecord.emptyRecord()));
    }

    private LinkedHashMap<String, DataType> extractFieldTypes(Table table) {
        List columns = table.columns();
        LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<String, DataType>(columns.size());
        HashSet<String> existedFields = new HashSet<String>();
        Function<String, String> columnDuplicateErrMsg = CdcActionCommonUtils.columnDuplicateErrMsg(table.id().toString());
        for (Column column : columns) {
            String columnName = CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck(column.name(), existedFields, this.caseSensitive, columnDuplicateErrMsg);
            DataType dataType = MySqlTypeUtils.toDataType(column.typeExpression(), column.length(), column.scale().orElse(null), this.typeMapping);
            dataType = dataType.copy(this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_NULLABLE) || column.isOptional());
            fieldTypes.put(columnName, dataType);
        }
        return fieldTypes;
    }

    private List<RichCdcMultiplexRecord> extractRecords() {
        Map<String, String> after;
        if (this.nonPkTables.contains(this.currentTable)) {
            return Collections.emptyList();
        }
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        Map<String, String> before = this.extractRow(this.root.payload().before());
        if (!before.isEmpty()) {
            before = CdcActionCommonUtils.mapKeyCaseConvert(before, this.caseSensitive, CdcActionCommonUtils.recordKeyDuplicateErrMsg(before));
            records.add(this.createRecord(RowKind.DELETE, before));
        }
        if (!(after = this.extractRow(this.root.payload().after())).isEmpty()) {
            after = CdcActionCommonUtils.mapKeyCaseConvert(after, this.caseSensitive, CdcActionCommonUtils.recordKeyDuplicateErrMsg(after));
            records.add(this.createRecord(RowKind.INSERT, after));
        }
        return records;
    }

    private Map<String, String> extractRow(JsonNode recordRow) {
        if (JsonSerdeUtil.isNull(recordRow)) {
            return new HashMap<String, String>();
        }
        DebeziumEvent.Field schema = Preconditions.checkNotNull(this.root.schema(), "MySqlRecordParser only supports debezium JSON with schema. Please make sure that `includeSchema` is true in the JsonDebeziumDeserializationSchema you created");
        Map<String, DebeziumEvent.Field> fields = schema.beforeAndAfterFields();
        LinkedHashMap<String, String> resultMap = new LinkedHashMap<String, String>();
        for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet()) {
            String oldValue;
            String fieldName = field.getKey();
            String string = field.getValue().type();
            JsonNode objectValue = recordRow.get(fieldName);
            if (JsonSerdeUtil.isNull(objectValue)) continue;
            String className = field.getValue().name();
            String newValue = oldValue = objectValue.asText();
            if ("io.debezium.data.Bits".equals(className)) {
                byte[] littleEndian = Base64.getDecoder().decode(oldValue);
                byte[] bigEndian = new byte[littleEndian.length];
                for (int i = 0; i < littleEndian.length; ++i) {
                    bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
                }
                newValue = this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_STRING) ? StringUtils.bytesToBinaryString(bigEndian) : Base64.getEncoder().encodeToString(bigEndian);
            } else if ("bytes".equals(string) && className == null) {
                newValue = new String(Base64.getDecoder().decode(oldValue));
            } else {
                long nanoAdjustment;
                long seconds;
                long nanosecondsPerMicros;
                if ("bytes".equals(string) && "org.apache.flink.kafka.shaded.org.apache.kafka.connect.data.Decimal".equals(className)) {
                    try {
                        new BigDecimal(oldValue);
                    }
                    catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid big decimal value " + oldValue + ". Make sure that in the `customConverterConfigs` of the JsonDebeziumDeserializationSchema you created, set '" + "decimal.format" + "' to 'numeric'", e);
                    }
                }
                if ("io.debezium.time.Date".equals(className)) {
                    newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
                } else if ("io.debezium.time.Timestamp".equals(className)) {
                    LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC);
                    newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
                } else if ("io.debezium.time.MicroTimestamp".equals(className)) {
                    long microseconds = Long.parseLong(oldValue);
                    long microsecondsPerSecond = 1000000L;
                    nanosecondsPerMicros = 1000L;
                    seconds = microseconds / microsecondsPerSecond;
                    nanoAdjustment = microseconds % microsecondsPerSecond * nanosecondsPerMicros;
                    LocalDateTime localDateTime = Instant.ofEpochSecond(seconds, nanoAdjustment).atZone(ZoneOffset.UTC).toLocalDateTime();
                    newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
                } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
                    LocalDateTime localDateTime = Instant.parse(oldValue).atZone(this.serverTimeZone).toLocalDateTime();
                    newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
                } else if ("io.debezium.time.MicroTime".equals(className)) {
                    long microseconds = Long.parseLong(oldValue);
                    long microsecondsPerSecond = 1000000L;
                    nanosecondsPerMicros = 1000L;
                    seconds = microseconds / microsecondsPerSecond;
                    nanoAdjustment = microseconds % microsecondsPerSecond * nanosecondsPerMicros;
                    newValue = Instant.ofEpochSecond(seconds, nanoAdjustment).atZone(ZoneOffset.UTC).toLocalTime().toString();
                } else if ("io.debezium.data.geometry.Point".equals(className) || "io.debezium.data.geometry.Geometry".equals(className)) {
                    try {
                        byte[] wkb = objectValue.get("wkb").binaryValue();
                        newValue = MySqlTypeUtils.convertWkbArray(wkb);
                    }
                    catch (Exception e) {
                        throw new IllegalArgumentException(String.format("Failed to convert %s to geometry JSON.", objectValue), e);
                    }
                }
            }
            resultMap.put(fieldName, newValue);
        }
        for (ComputedColumn computedColumn : this.computedColumns) {
            resultMap.put(computedColumn.columnName(), computedColumn.eval((String)resultMap.get(computedColumn.fieldReference())));
        }
        for (Iterator<Object> iterator : this.metadataConverters) {
            resultMap.put(iterator.columnName(), iterator.read(this.root.payload().source()));
        }
        return resultMap;
    }

    protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> data) {
        return new RichCdcMultiplexRecord(this.databaseName, this.currentTable, new LinkedHashMap<String, DataType>(0), Collections.emptyList(), new CdcRecord(rowKind, data));
    }
}

