/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.debezium;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TypeUtils;

public class DebeziumSchemaUtils {
    public static final String FIELD_SCHEMA = "schema";
    public static final String FIELD_PAYLOAD = "payload";
    public static final String FIELD_SOURCE = "source";
    public static final String FIELD_PRIMARY = "pkNames";
    public static final String FIELD_DB = "db";
    public static final String FIELD_BEFORE = "before";
    public static final String FIELD_AFTER = "after";
    public static final String FIELD_TYPE = "op";
    public static final String OP_READE = "r";
    public static final String OP_INSERT = "c";
    public static final String OP_UPDATE = "u";
    public static final String OP_DELETE = "d";
    public static final String OP_TRUNCATE = "t";
    public static final String OP_MESSAGE = "m";
    protected static final String CONNECT_PARAMETERS_PROP = "connect.parameters";
    protected static final String CONNECT_NAME_PROP = "connect.name";
    static final String SCHEMA_PARAMETER_COLUMN_TYPE = "__debezium.source.column.type";
    private static final String SCHEMA_PARAMETER_COLUMN_SIZE = "__debezium.source.column.length";
    private static final String SCHEMA_PARAMETER_COLUMN_PRECISION = "__debezium.source.column.scale";
    static final String SCHEMA_PARAMETER_COLUMN_NAME = "__debezium.source.column.name";

    public static String transformRawValue(@Nullable String rawValue, String debeziumType, @Nullable String className, TypeMapping typeMapping, JsonNode origin, ZoneId serverTimeZone) {
        return DebeziumSchemaUtils.transformRawValue(rawValue, debeziumType, className, typeMapping, () -> {
            try {
                return ByteBuffer.wrap(origin.get("wkb").binaryValue());
            }
            catch (IOException e) {
                throw new IllegalArgumentException(String.format("Failed to convert %s to geometry JSON.", rawValue), e);
            }
        }, origin, serverTimeZone);
    }

    public static String transformAvroRawValue(@Nullable String rawValue, String debeziumType, @Nullable String className, TypeMapping typeMapping, Object origin, ZoneId serverTimeZone) {
        if (rawValue != null && "bytes".equals(debeziumType) && className == null) {
            return new String(((ByteBuffer)origin).array());
        }
        return DebeziumSchemaUtils.transformRawValue(rawValue, debeziumType, className, typeMapping, () -> (ByteBuffer)((GenericRecord)origin).get("wkb"), origin, serverTimeZone);
    }

    public static String transformRawValue(@Nullable String rawValue, String debeziumType, @Nullable String className, TypeMapping typeMapping, Supplier<ByteBuffer> geometryGetter, Object origin, ZoneId serverTimeZone) {
        if (rawValue == null) {
            return null;
        }
        String transformed = rawValue;
        if ("io.debezium.data.Bits".equals(className)) {
            byte[] littleEndian = Base64.getDecoder().decode(rawValue);
            byte[] bigEndian = new byte[littleEndian.length];
            for (int i = 0; i < littleEndian.length; ++i) {
                bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
            }
            transformed = typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_STRING) ? StringUtils.bytesToBinaryString(bigEndian) : Base64.getEncoder().encodeToString(bigEndian);
        } else if ("bytes".equals(debeziumType) && className == null) {
            transformed = new String(Base64.getDecoder().decode(rawValue));
        } else {
            if ("bytes".equals(debeziumType) && DebeziumSchemaUtils.decimalLogicalName().equals(className)) {
                try {
                    new BigDecimal(rawValue);
                }
                catch (NumberFormatException e) {
                    throw new IllegalArgumentException("Invalid big decimal value " + rawValue + ". Make sure that in the `customConverterConfigs` of the JsonDebeziumDeserializationSchema you created, set '" + "decimal.format" + "' to 'numeric'", e);
                }
            }
            if ("io.debezium.time.Date".equals(className)) {
                transformed = DateTimeUtils.toLocalDate(Integer.parseInt(rawValue)).toString();
            } else if ("io.debezium.time.Timestamp".equals(className)) {
                LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(Long.parseLong(rawValue), ZoneOffset.UTC);
                transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
            } else if ("io.debezium.time.MicroTimestamp".equals(className)) {
                long microseconds = Long.parseLong(rawValue);
                long microsecondsPerSecond = 1000000L;
                long nanosecondsPerMicros = 1000L;
                long seconds = microseconds / microsecondsPerSecond;
                long nanoAdjustment = microseconds % microsecondsPerSecond * nanosecondsPerMicros;
                LocalDateTime localDateTime = Instant.ofEpochSecond(seconds, nanoAdjustment).atZone(ZoneOffset.UTC).toLocalDateTime();
                transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
            } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
                LocalDateTime localDateTime = Instant.parse(rawValue).atZone(serverTimeZone).toLocalDateTime();
                transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
            } else if ("io.debezium.time.MicroTime".equals(className)) {
                long microseconds = Long.parseLong(rawValue);
                long microsecondsPerSecond = 1000000L;
                long nanosecondsPerMicros = 1000L;
                long seconds = microseconds / microsecondsPerSecond;
                long nanoAdjustment = microseconds % microsecondsPerSecond * nanosecondsPerMicros;
                transformed = Instant.ofEpochSecond(seconds, nanoAdjustment).atZone(ZoneOffset.UTC).toLocalTime().toString();
            } else {
                if ("io.debezium.data.geometry.Point".equals(className) || "io.debezium.data.geometry.Geometry".equals(className)) {
                    try {
                        transformed = MySqlTypeUtils.convertWkbArray(geometryGetter.get());
                    }
                    catch (Exception e) {
                        throw new IllegalArgumentException(String.format("Failed to convert %s to geometry JSON.", rawValue), e);
                    }
                }
                if (origin instanceof GenericData.Record || origin instanceof GenericData.Array || origin instanceof Map || origin instanceof List) {
                    Object convertedObject = DebeziumSchemaUtils.convertAvroObjectToJsonCompatible(origin);
                    try {
                        transformed = TypeUtils.OBJECT_MAPPER.writer().writeValueAsString(convertedObject);
                    }
                    catch (JsonProcessingException e) {
                        throw new RuntimeException(String.format("Failed to convert %s to JSON.", origin), e);
                    }
                }
            }
        }
        return transformed;
    }

    public static Object convertAvroObjectToJsonCompatible(Object avroObject) {
        if (avroObject instanceof GenericData.Record) {
            return DebeziumSchemaUtils.convertRecord((GenericData.Record)avroObject);
        }
        if (avroObject instanceof GenericData.Array) {
            return DebeziumSchemaUtils.convertArray((GenericData.Array)avroObject);
        }
        if (avroObject instanceof Utf8) {
            return avroObject.toString();
        }
        if (avroObject instanceof Map) {
            return DebeziumSchemaUtils.convertMap((Map)avroObject);
        }
        if (avroObject instanceof List) {
            return DebeziumSchemaUtils.convertList((List)avroObject);
        }
        return avroObject;
    }

    private static Map<Object, Object> convertMap(Map<Object, Object> map) {
        HashMap<Object, Object> newMap = new HashMap<Object, Object>();
        for (Map.Entry<Object, Object> entry : map.entrySet()) {
            Object key = DebeziumSchemaUtils.convertAvroObjectToJsonCompatible(entry.getKey());
            Object value = DebeziumSchemaUtils.convertAvroObjectToJsonCompatible(entry.getValue());
            newMap.put(key, value);
        }
        return newMap;
    }

    private static List<Object> convertList(List<Object> list) {
        ArrayList<Object> newList = new ArrayList<Object>();
        for (Object element : list) {
            newList.add(DebeziumSchemaUtils.convertAvroObjectToJsonCompatible(element));
        }
        return newList;
    }

    private static Map<String, Object> convertRecord(GenericData.Record record) {
        HashMap<String, Object> map = new HashMap<String, Object>();
        for (Schema.Field field : record.getSchema().getFields()) {
            Object value = record.get(field.pos());
            map.put(field.name(), DebeziumSchemaUtils.convertAvroObjectToJsonCompatible(value));
        }
        return map;
    }

    private static List<Object> convertArray(GenericData.Array<?> array) {
        ArrayList<Object> list = new ArrayList<Object>();
        for (Object element : array) {
            list.add(DebeziumSchemaUtils.convertAvroObjectToJsonCompatible(element));
        }
        return list;
    }

    public static DataType toDataType(String debeziumType, @Nullable String className, Map<String, String> parameters) {
        if (className == null) {
            return DebeziumSchemaUtils.fromDebeziumType(debeziumType);
        }
        if ("io.debezium.data.Bits".equals(className)) {
            int length = Integer.parseInt(parameters.get("length"));
            return DataTypes.BINARY((length + 7) / 8);
        }
        if (DebeziumSchemaUtils.decimalLogicalName().equals(className)) {
            String precision = parameters.get("connect.decimal.precision");
            if (precision == null) {
                return DataTypes.DECIMAL(20, 0);
            }
            int p = Integer.parseInt(precision);
            if (p > 38) {
                return DataTypes.STRING();
            }
            int scale = Integer.parseInt(parameters.get("scale"));
            return DataTypes.DECIMAL(p, scale);
        }
        if ("io.debezium.time.Date".equals(className)) {
            return DataTypes.DATE();
        }
        if ("io.debezium.time.Timestamp".equals(className)) {
            return DataTypes.TIMESTAMP(3);
        }
        if ("io.debezium.time.MicroTimestamp".equals(className) || "io.debezium.time.ZonedTimestamp".equals(className)) {
            return DataTypes.TIMESTAMP(6);
        }
        if ("io.debezium.time.MicroTime".equals(className)) {
            return DataTypes.TIME();
        }
        return DebeziumSchemaUtils.fromDebeziumType(debeziumType);
    }

    private static DataType fromDebeziumType(String dbzType) {
        switch (dbzType) {
            case "int8": {
                return DataTypes.TINYINT();
            }
            case "int16": {
                return DataTypes.SMALLINT();
            }
            case "int32": {
                return DataTypes.INT();
            }
            case "int64": {
                return DataTypes.BIGINT();
            }
            case "float": 
            case "float32": 
            case "float64": {
                return DataTypes.FLOAT();
            }
            case "double": {
                return DataTypes.DOUBLE();
            }
            case "boolean": {
                return DataTypes.BOOLEAN();
            }
            case "bytes": {
                return DataTypes.BYTES();
            }
        }
        return DataTypes.STRING();
    }

    public static String decimalLogicalName() {
        return "org.apache.#.connect.data.Decimal".replace("#", "kafka");
    }

    public static Map<String, String> getAvroConnectParameters(Schema schema) {
        if (schema.getObjectProp(CONNECT_PARAMETERS_PROP) != null) {
            return (Map)schema.getObjectProp(CONNECT_PARAMETERS_PROP);
        }
        return new HashMap<String, String>();
    }

    public static DataType avroToPaimonDataType(Schema schema) {
        Map<String, String> connectParameters = DebeziumSchemaUtils.getAvroConnectParameters(schema);
        if (!connectParameters.isEmpty()) {
            String typeName = connectParameters.getOrDefault(SCHEMA_PARAMETER_COLUMN_TYPE, schema.getType().name());
            Integer length = Optional.ofNullable(connectParameters.get(SCHEMA_PARAMETER_COLUMN_SIZE)).map(Integer::valueOf).orElse(null);
            Integer scale = Optional.ofNullable(connectParameters.get(SCHEMA_PARAMETER_COLUMN_PRECISION)).map(Integer::valueOf).orElse(null);
            return MySqlTypeUtils.toDataType(typeName, length, scale, TypeMapping.defaultMapping());
        }
        return DebeziumSchemaUtils.fromDebeziumAvroType(schema);
    }

    private static DataType fromDebeziumAvroType(Schema schema) {
        LogicalType logicalType = schema.getLogicalType();
        if (logicalType != null) {
            if (logicalType instanceof LogicalTypes.Date) {
                return DataTypes.DATE();
            }
            if (logicalType instanceof LogicalTypes.TimestampMillis) {
                return DataTypes.TIMESTAMP_MILLIS();
            }
            if (logicalType instanceof LogicalTypes.TimestampMicros) {
                return DataTypes.TIMESTAMP();
            }
            if (logicalType instanceof LogicalTypes.Decimal) {
                LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)logicalType;
                return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale());
            }
            if (logicalType instanceof LogicalTypes.TimeMillis) {
                return DataTypes.TIME(3);
            }
            if (logicalType instanceof LogicalTypes.TimeMicros) {
                return DataTypes.TIME(6);
            }
            if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
                return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
            }
            if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
                return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3);
            }
            throw new UnsupportedOperationException(String.format("Don't support logical avro type '%s' yet.", logicalType));
        }
        Schema.Type avroType = schema.getType();
        switch (avroType) {
            case BOOLEAN: {
                return DataTypes.BOOLEAN();
            }
            case BYTES: 
            case FIXED: {
                return DataTypes.BYTES();
            }
            case DOUBLE: {
                return DataTypes.DOUBLE();
            }
            case FLOAT: {
                return DataTypes.FLOAT();
            }
            case INT: {
                return DataTypes.INT();
            }
            case LONG: {
                return DataTypes.BIGINT();
            }
            case ENUM: 
            case STRING: {
                return DataTypes.STRING();
            }
            case RECORD: {
                ArrayList<DataField> fields = new ArrayList<DataField>();
                for (Schema.Field field : schema.getFields()) {
                    DataType fieldType = DebeziumSchemaUtils.fromDebeziumAvroType(field.schema());
                    fields.add(DataTypes.FIELD(field.pos(), field.name(), fieldType, field.doc()));
                }
                return DataTypes.ROW(fields.toArray(new DataField[0]));
            }
            case ARRAY: {
                Schema elementSchema = schema.getElementType();
                DataType elementType = DebeziumSchemaUtils.fromDebeziumAvroType(elementSchema);
                return DataTypes.ARRAY(elementType);
            }
            case MAP: {
                DataType valueType = DebeziumSchemaUtils.fromDebeziumAvroType(schema.getValueType());
                return DataTypes.MAP(DataTypes.STRING(), valueType);
            }
            case UNION: {
                List unionTypes = schema.getTypes();
                if (unionTypes.size() == 2 && unionTypes.contains(Schema.create((Schema.Type)Schema.Type.NULL))) {
                    Schema actualSchema = unionTypes.stream().filter(s -> s.getType() != Schema.Type.NULL).findFirst().orElseThrow(() -> new IllegalStateException("Union type does not contain a non-null type"));
                    return DebeziumSchemaUtils.fromDebeziumAvroType(actualSchema).copy(true);
                }
                throw new UnsupportedOperationException("Generic unions are not supported");
            }
        }
        throw new UnsupportedOperationException(String.format("Don't support avro type '%s' yet.", avroType));
    }
}

