/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql;

import io.debezium.relational.history.TableChanges;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlDebeziumJsonEventParser
implements EventParser<String> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumJsonEventParser.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final ZoneId serverTimeZone;
    private final boolean caseSensitive;
    private final TableNameConverter tableNameConverter;
    private final List<ComputedColumn> computedColumns;
    private final NewTableSchemaBuilder<JsonNode> schemaBuilder;
    @Nullable
    private final Pattern includingPattern;
    @Nullable
    private final Pattern excludingPattern;
    private final Set<String> includedTables = new HashSet<String>();
    private final Set<String> excludedTables = new HashSet<String>();
    private final TypeMapping typeMapping;
    private JsonNode root;
    private JsonNode payload;
    private String currentTable;
    private boolean shouldSynchronizeCurrentTable;

    public MySqlDebeziumJsonEventParser(ZoneId serverTimeZone, boolean caseSensitive, List<ComputedColumn> computedColumns, TypeMapping typeMapping) {
        this(serverTimeZone, caseSensitive, computedColumns, new TableNameConverter(caseSensitive), ddl -> Optional.empty(), null, null, typeMapping);
    }

    public MySqlDebeziumJsonEventParser(ZoneId serverTimeZone, boolean caseSensitive, TableNameConverter tableNameConverter, NewTableSchemaBuilder<JsonNode> schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, TypeMapping typeMapping) {
        this(serverTimeZone, caseSensitive, Collections.emptyList(), tableNameConverter, schemaBuilder, includingPattern, excludingPattern, typeMapping);
    }

    public MySqlDebeziumJsonEventParser(ZoneId serverTimeZone, boolean caseSensitive, List<ComputedColumn> computedColumns, TableNameConverter tableNameConverter, NewTableSchemaBuilder<JsonNode> schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, TypeMapping typeMapping) {
        this.serverTimeZone = serverTimeZone;
        this.caseSensitive = caseSensitive;
        this.computedColumns = computedColumns;
        this.tableNameConverter = tableNameConverter;
        this.schemaBuilder = schemaBuilder;
        this.includingPattern = includingPattern;
        this.excludingPattern = excludingPattern;
        this.typeMapping = typeMapping;
    }

    @Override
    public void setRawEvent(String rawEvent) {
        try {
            this.root = this.objectMapper.readValue(rawEvent, JsonNode.class);
            this.payload = this.root.get("payload");
            this.currentTable = this.payload.get("source").get("table").asText();
            this.shouldSynchronizeCurrentTable = this.shouldSynchronizeCurrentTable();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String parseTableName() {
        return this.tableNameConverter.convert(Identifier.create(this.getDatabaseName(), this.currentTable));
    }

    private boolean isSchemaChange() {
        return this.payload.get("op") == null;
    }

    @Override
    public List<DataField> parseSchemaChange() {
        JsonNode columns;
        if (!this.shouldSynchronizeCurrentTable || !this.isSchemaChange()) {
            return Collections.emptyList();
        }
        JsonNode historyRecord = this.payload.get("historyRecord");
        if (historyRecord == null) {
            return Collections.emptyList();
        }
        try {
            String historyRecordString = historyRecord.asText();
            JsonNode tableChanges = this.objectMapper.readTree(historyRecordString).get("tableChanges");
            if (tableChanges.size() != 1) {
                LOG.error("Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" + historyRecord.asText());
                return Collections.emptyList();
            }
            columns = tableChanges.get(0).get("table").get("columns");
        }
        catch (Exception e) {
            LOG.info("Failed to parse history record for schema changes", (Throwable)e);
            return Collections.emptyList();
        }
        if (columns == null) {
            return Collections.emptyList();
        }
        ArrayList<DataField> result = new ArrayList<DataField>();
        for (int i = 0; i < columns.size(); ++i) {
            JsonNode column = columns.get(i);
            JsonNode length = column.get("length");
            JsonNode scale = column.get("scale");
            DataType type = MySqlTypeUtils.toDataType(column.get("typeName").asText(), length == null ? null : Integer.valueOf(length.asInt()), scale == null ? null : Integer.valueOf(scale.asInt()), this.typeMapping);
            if (!this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_NULLABLE)) {
                type = type.copy(column.get("optional").asBoolean());
            }
            String fieldName = column.get("name").asText();
            result.add(new DataField(i, this.caseSensitive ? fieldName : fieldName.toLowerCase(), type));
        }
        return result;
    }

    @Override
    public Optional<Schema> parseNewTable() {
        if (!this.shouldSynchronizeCurrentTable) {
            return Optional.empty();
        }
        JsonNode historyRecord = this.payload.get("historyRecord");
        if (historyRecord == null) {
            return Optional.empty();
        }
        try {
            String historyRecordString = historyRecord.asText();
            JsonNode tableChanges = this.objectMapper.readTree(historyRecordString).get("tableChanges");
            if (tableChanges.size() != 1) {
                LOG.error("Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" + historyRecord.asText());
                return Optional.empty();
            }
            JsonNode tableChange = tableChanges.get(0);
            if (!tableChange.get("type").asText().equals(TableChanges.TableChangeType.CREATE.name())) {
                return Optional.empty();
            }
            JsonNode primaryKeyColumnNames = tableChange.get("table").get("primaryKeyColumnNames");
            if (primaryKeyColumnNames.size() == 0) {
                LOG.debug("Didn't find primary keys from MySQL DDL for table '{}'. This table won't be synchronized.", (Object)this.currentTable);
                this.excludedTables.add(this.currentTable);
                this.shouldSynchronizeCurrentTable = false;
                return Optional.empty();
            }
            return this.schemaBuilder.build(tableChange);
        }
        catch (Exception e) {
            LOG.info("Failed to parse history record for schema changes", (Throwable)e);
            return Optional.empty();
        }
    }

    @Override
    public List<CdcRecord> parseRecords() {
        Map<String, String> after;
        if (!this.shouldSynchronizeCurrentTable || this.isSchemaChange()) {
            return Collections.emptyList();
        }
        ArrayList<CdcRecord> records = new ArrayList<CdcRecord>();
        Map<String, String> before = this.extractRow(this.payload.get("before"));
        if (before.size() > 0) {
            before = this.caseSensitive ? before : this.keyCaseInsensitive(before);
            records.add(new CdcRecord(RowKind.DELETE, before));
        }
        if ((after = this.extractRow(this.payload.get("after"))).size() > 0) {
            after = this.caseSensitive ? after : this.keyCaseInsensitive(after);
            records.add(new CdcRecord(RowKind.INSERT, after));
        }
        return records;
    }

    private String getDatabaseName() {
        return this.payload.get("source").get("db").asText();
    }

    private Map<String, String> extractRow(JsonNode recordRow) {
        JsonNode schema = Preconditions.checkNotNull(this.root.get("schema"), "MySqlDebeziumJsonEventParser only supports debezium JSON with schema. Please make sure that `includeSchema` is true in the JsonDebeziumDeserializationSchema you created");
        HashMap<String, String> mySqlFieldTypes = new HashMap<String, String>();
        HashMap<String, String> fieldClassNames = new HashMap<String, String>();
        JsonNode arrayNode = schema.get("fields");
        for (int i = 0; i < arrayNode.size(); ++i) {
            JsonNode elementNode = arrayNode.get(i);
            String field = elementNode.get("field").asText();
            if (!"before".equals(field) && !"after".equals(field)) continue;
            JsonNode innerArrayNode = elementNode.get("fields");
            for (int j = 0; j < innerArrayNode.size(); ++j) {
                JsonNode innerElementNode = innerArrayNode.get(j);
                String fieldName = innerElementNode.get("field").asText();
                String fieldType = innerElementNode.get("type").asText();
                mySqlFieldTypes.put(fieldName, fieldType);
                if (innerElementNode.get("name") == null) continue;
                String className = innerElementNode.get("name").asText();
                fieldClassNames.put(fieldName, className);
            }
        }
        Map<String, Object> jsonMap = this.objectMapper.convertValue((Object)recordRow, new TypeReference<Map<String, Object>>(){});
        if (jsonMap == null) {
            return new HashMap<String, String>();
        }
        HashMap<String, String> resultMap = new HashMap<String, String>();
        for (Map.Entry field : mySqlFieldTypes.entrySet()) {
            String oldValue;
            String fieldName = (String)field.getKey();
            String mySqlType = (String)field.getValue();
            Object objectValue = jsonMap.get(fieldName);
            if (objectValue == null) continue;
            String className = (String)fieldClassNames.get(fieldName);
            String newValue = oldValue = objectValue.toString();
            if ("io.debezium.data.Bits".equals(className)) {
                byte[] littleEndian = Base64.getDecoder().decode(oldValue);
                byte[] bigEndian = new byte[littleEndian.length];
                for (int i = 0; i < littleEndian.length; ++i) {
                    bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
                }
                newValue = this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_STRING) ? StringUtils.bytesToBinaryString(bigEndian) : Base64.getEncoder().encodeToString(bigEndian);
            } else if ("bytes".equals(mySqlType) && className == null) {
                newValue = new String(Base64.getDecoder().decode(oldValue));
            } else {
                long nanoAdjustment;
                long seconds;
                long nanosecondsPerMicros;
                if ("bytes".equals(mySqlType) && "org.apache.flink.kafka.shaded.org.apache.kafka.connect.data.Decimal".equals(className)) {
                    try {
                        new BigDecimal(oldValue);
                    }
                    catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid big decimal value " + oldValue + ". Make sure that in the `customConverterConfigs` of the JsonDebeziumDeserializationSchema you created, set '" + "decimal.format" + "' to 'numeric'", e);
                    }
                }
                if ("io.debezium.time.Date".equals(className)) {
                    newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
                } else if ("io.debezium.time.Timestamp".equals(className)) {
                    LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC);
                    newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
                } else if ("io.debezium.time.MicroTimestamp".equals(className)) {
                    long microseconds = Long.parseLong(oldValue);
                    long microsecondsPerSecond = 1000000L;
                    nanosecondsPerMicros = 1000L;
                    seconds = microseconds / microsecondsPerSecond;
                    nanoAdjustment = microseconds % microsecondsPerSecond * nanosecondsPerMicros;
                    LocalDateTime localDateTime = Instant.ofEpochSecond(seconds, nanoAdjustment).atZone(ZoneOffset.UTC).toLocalDateTime();
                    newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
                } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
                    LocalDateTime localDateTime = Instant.parse(oldValue).atZone(this.serverTimeZone).toLocalDateTime();
                    newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
                } else if ("io.debezium.time.MicroTime".equals(className)) {
                    long microseconds = Long.parseLong(oldValue);
                    long microsecondsPerSecond = 1000000L;
                    nanosecondsPerMicros = 1000L;
                    seconds = microseconds / microsecondsPerSecond;
                    nanoAdjustment = microseconds % microsecondsPerSecond * nanosecondsPerMicros;
                    newValue = Instant.ofEpochSecond(seconds, nanoAdjustment).atZone(ZoneOffset.UTC).toLocalTime().toString();
                } else if ("io.debezium.data.geometry.Point".equals(className) || "io.debezium.data.geometry.Geometry".equals(className)) {
                    JsonNode jsonNode = recordRow.get(fieldName);
                    try {
                        byte[] wkb = jsonNode.get("wkb").binaryValue();
                        newValue = MySqlTypeUtils.convertWkbArray(wkb);
                    }
                    catch (Exception e) {
                        throw new IllegalArgumentException(String.format("Failed to convert %s to geometry JSON.", jsonNode), e);
                    }
                }
            }
            resultMap.put(fieldName, newValue);
        }
        for (ComputedColumn computedColumn : this.computedColumns) {
            resultMap.put(computedColumn.columnName(), computedColumn.eval((String)resultMap.get(computedColumn.fieldReference())));
        }
        return resultMap;
    }

    private Map<String, String> keyCaseInsensitive(Map<String, String> origin) {
        HashMap<String, String> keyCaseInsensitive = new HashMap<String, String>();
        for (Map.Entry<String, String> entry : origin.entrySet()) {
            String fieldName = entry.getKey().toLowerCase();
            Preconditions.checkArgument(!keyCaseInsensitive.containsKey(fieldName), "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n%s", origin);
            keyCaseInsensitive.put(fieldName, entry.getValue());
        }
        return keyCaseInsensitive;
    }

    private boolean shouldSynchronizeCurrentTable() {
        if (this.excludedTables.contains(this.currentTable)) {
            return false;
        }
        if (this.includedTables.contains(this.currentTable)) {
            return true;
        }
        boolean shouldSynchronize = true;
        if (this.includingPattern != null) {
            shouldSynchronize = this.includingPattern.matcher(this.currentTable).matches();
        }
        if (this.excludingPattern != null) {
            boolean bl = shouldSynchronize = shouldSynchronize && !this.excludingPattern.matcher(this.currentTable).matches();
        }
        if (!shouldSynchronize) {
            LOG.debug("Source table {} won't be synchronized because it was excluded. ", (Object)this.currentTable);
            this.excludedTables.add(this.currentTable);
            return false;
        }
        this.includedTables.add(this.currentTable);
        return true;
    }
}

