/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.postgres;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresRecordParser
implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresRecordParser.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final ZoneId serverTimeZone;
    private final List<ComputedColumn> computedColumns;
    private final TypeMapping typeMapping;
    private DebeziumEvent root;
    private String currentTable;
    private String databaseName;
    private final CdcMetadataConverter[] metadataConverters;

    public PostgresRecordParser(Configuration postgresConfig, List<ComputedColumn> computedColumns, TypeMapping typeMapping, CdcMetadataConverter[] metadataConverters) {
        this.computedColumns = computedColumns;
        this.typeMapping = typeMapping;
        this.metadataConverters = metadataConverters;
        this.objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        String stringifyServerTimeZone = (String)postgresConfig.get(PostgresSourceOptions.SERVER_TIME_ZONE);
        this.serverTimeZone = stringifyServerTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(stringifyServerTimeZone);
    }

    public void flatMap(CdcSourceRecord rawEvent, Collector<RichCdcMultiplexRecord> out) throws Exception {
        this.root = (DebeziumEvent)this.objectMapper.readValue((String)rawEvent.getValue(), DebeziumEvent.class);
        this.currentTable = this.root.payload().source().get("table").asText();
        this.databaseName = this.root.payload().source().get("db").asText();
        this.extractRecords().forEach(arg_0 -> out.collect(arg_0));
    }

    private List<DataField> extractFields(DebeziumEvent.Field schema) {
        Map<String, DebeziumEvent.Field> afterFields = schema.afterFields();
        Preconditions.checkArgument((!afterFields.isEmpty() ? 1 : 0) != 0, (Object)"PostgresRecordParser only supports debezium JSON with schema. Please make sure that `includeSchema` is true in the JsonDebeziumDeserializationSchema you created");
        RowType.Builder rowType = RowType.builder();
        afterFields.forEach((key, value) -> {
            DataType dataType = this.extractFieldType((DebeziumEvent.Field)value);
            dataType = dataType.copy(this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_NULLABLE) || value.optional() != false);
            rowType.field(key, dataType);
        });
        return rowType.build().getFields();
    }

    private DataType extractFieldType(DebeziumEvent.Field field) {
        switch (field.type()) {
            case "array": {
                return DataTypes.ARRAY((DataType)DataTypes.STRING());
            }
            case "map": 
            case "struct": {
                return DataTypes.MAP((DataType)DataTypes.STRING(), (DataType)DataTypes.STRING());
            }
            case "int8": {
                return DataTypes.TINYINT();
            }
            case "int16": {
                return DataTypes.SMALLINT();
            }
            case "int32": {
                if ("io.debezium.time.Date".equals(field.name())) {
                    return DataTypes.DATE();
                }
                return DataTypes.INT();
            }
            case "int64": {
                if ("io.debezium.time.MicroTimestamp".equals(field.name())) {
                    return DataTypes.TIMESTAMP((int)6);
                }
                if ("io.debezium.time.MicroTime".equals(field.name())) {
                    return DataTypes.TIME((int)6);
                }
                return DataTypes.BIGINT();
            }
            case "float": 
            case "float32": {
                return DataTypes.FLOAT();
            }
            case "float64": 
            case "double": {
                return DataTypes.DOUBLE();
            }
            case "boolean": {
                return DataTypes.BOOLEAN();
            }
            case "string": {
                return DataTypes.STRING();
            }
            case "bytes": {
                if (DebeziumSchemaUtils.decimalLogicalName().equals(field.name())) {
                    int precision = field.parameters().get("connect.decimal.precision").asInt();
                    int scale = field.parameters().get("scale").asInt();
                    return DataTypes.DECIMAL((int)precision, (int)scale);
                }
                if ("io.debezium.data.Bits".equals(field.name())) {
                    String stringifyLength = field.parameters().get("length").asText();
                    if (StringUtils.isNullOrWhitespaceOnly((String)stringifyLength)) {
                        return DataTypes.BOOLEAN();
                    }
                    Integer length = Integer.valueOf(stringifyLength);
                    if (length == 1) {
                        return DataTypes.BOOLEAN();
                    }
                    return DataTypes.BINARY((int)(length == Integer.MAX_VALUE ? length / 8 : (length + 7) / 8));
                }
                return DataTypes.BYTES();
            }
        }
        return DataTypes.STRING();
    }

    private List<RichCdcMultiplexRecord> extractRecords() {
        Map<String, String> after;
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        Map<String, String> before = this.extractRow(this.root.payload().before());
        if (!before.isEmpty()) {
            records.add(this.createRecord(RowKind.DELETE, before));
        }
        if (!(after = this.extractRow(this.root.payload().after())).isEmpty()) {
            List<DataField> fields = this.extractFields(this.root.schema());
            records.add(new RichCdcMultiplexRecord(this.databaseName, this.currentTable, fields, Collections.emptyList(), new CdcRecord(RowKind.INSERT, after)));
        }
        return records;
    }

    private Map<String, String> extractRow(JsonNode recordRow) {
        if (JsonSerdeUtil.isNull((JsonNode)recordRow)) {
            return new HashMap<String, String>();
        }
        DebeziumEvent.Field schema = (DebeziumEvent.Field)Preconditions.checkNotNull((Object)this.root.schema(), (String)"PostgresRecordParser only supports debezium JSON with schema. Please make sure that `includeSchema` is true in the JsonDebeziumDeserializationSchema you created");
        Map<String, DebeziumEvent.Field> fields = schema.beforeAndAfterFields();
        LinkedHashMap<String, String> resultMap = new LinkedHashMap<String, String>();
        for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet()) {
            String oldValue;
            String fieldName = field.getKey();
            String string = field.getValue().type();
            JsonNode objectValue = recordRow.get(fieldName);
            if (JsonSerdeUtil.isNull((JsonNode)objectValue)) continue;
            String className = field.getValue().name();
            String newValue = oldValue = objectValue.asText();
            if ("io.debezium.data.Bits".equals(className)) {
                byte[] littleEndian = Base64.getDecoder().decode(oldValue);
                byte[] bigEndian = new byte[littleEndian.length];
                for (int i = 0; i < littleEndian.length; ++i) {
                    bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
                }
                newValue = this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_STRING) ? StringUtils.bytesToBinaryString((byte[])bigEndian) : Base64.getEncoder().encodeToString(bigEndian);
            } else if ("bytes".equals(string) && className == null) {
                newValue = new String(Base64.getDecoder().decode(oldValue));
            } else {
                long nanoAdjustment;
                long seconds;
                long nanosecondsPerMicros;
                if ("bytes".equals(string) && DebeziumSchemaUtils.decimalLogicalName().equals(className)) {
                    try {
                        new BigDecimal(oldValue);
                    }
                    catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid big decimal value " + oldValue + ". Make sure that in the `customConverterConfigs` of the JsonDebeziumDeserializationSchema you created, set '" + "decimal.format" + "' to 'numeric'", e);
                    }
                }
                if ("io.debezium.time.Date".equals(className)) {
                    newValue = DateTimeUtils.toLocalDate((int)Integer.parseInt(oldValue)).toString();
                } else if ("io.debezium.time.Timestamp".equals(className)) {
                    LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime((long)Long.parseLong(oldValue), (ZoneId)ZoneOffset.UTC);
                    newValue = DateTimeUtils.formatLocalDateTime((LocalDateTime)localDateTime, (int)3);
                } else if ("io.debezium.time.MicroTimestamp".equals(className)) {
                    long microseconds = Long.parseLong(oldValue);
                    long microsecondsPerSecond = 1000000L;
                    nanosecondsPerMicros = 1000L;
                    seconds = microseconds / microsecondsPerSecond;
                    nanoAdjustment = microseconds % microsecondsPerSecond * nanosecondsPerMicros;
                    LocalDateTime localDateTime = Instant.ofEpochSecond(seconds, nanoAdjustment).atZone(ZoneOffset.UTC).toLocalDateTime();
                    newValue = DateTimeUtils.formatLocalDateTime((LocalDateTime)localDateTime, (int)6);
                } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
                    LocalDateTime localDateTime = Instant.parse(oldValue).atZone(this.serverTimeZone).toLocalDateTime();
                    newValue = DateTimeUtils.formatLocalDateTime((LocalDateTime)localDateTime, (int)6);
                } else if ("io.debezium.time.MicroTime".equals(className)) {
                    long microseconds = Long.parseLong(oldValue);
                    long microsecondsPerSecond = 1000000L;
                    nanosecondsPerMicros = 1000L;
                    seconds = microseconds / microsecondsPerSecond;
                    nanoAdjustment = microseconds % microsecondsPerSecond * nanosecondsPerMicros;
                    newValue = Instant.ofEpochSecond(seconds, nanoAdjustment).atZone(ZoneOffset.UTC).toLocalTime().toString();
                } else if ("array".equals(string)) {
                    ArrayNode arrayNode = (ArrayNode)objectValue;
                    ArrayList newArrayValues = new ArrayList();
                    arrayNode.elements().forEachRemaining(element -> newArrayValues.add(element.asText()));
                    try {
                        newValue = this.objectMapper.writer().writeValueAsString(newArrayValues);
                    }
                    catch (JsonProcessingException e) {
                        LOG.error("Failed to convert array to JSON.", (Throwable)e);
                    }
                }
            }
            resultMap.put(fieldName, newValue);
        }
        for (ComputedColumn computedColumn : this.computedColumns) {
            resultMap.put(computedColumn.columnName(), computedColumn.eval((String)resultMap.get(computedColumn.fieldReference())));
        }
        for (Iterator<Object> iterator : this.metadataConverters) {
            resultMap.put(iterator.columnName(), iterator.read(this.root.payload().source()));
        }
        return resultMap;
    }

    protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> data) {
        return new RichCdcMultiplexRecord(this.databaseName, this.currentTable, Collections.emptyList(), Collections.emptyList(), new CdcRecord(rowKind, data));
    }
}

