/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender;

import com.mongodb.client.model.changestream.OperationType;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.SerializableFunction;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonType;
import org.bson.BsonValue;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.bson.types.Decimal128;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializationSchema<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(MongoDBConnectorDeserializationSchema.class);
    private final SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
    private final Map<String, DeserializationRuntimeConverter> tableRowConverters;

    public MongoDBConnectorDeserializationSchema(SeaTunnelDataType<SeaTunnelRow> physicalDataType, SeaTunnelDataType<SeaTunnelRow> resultTypeInfo) {
        this.tableRowConverters = this.createConverter(physicalDataType);
        this.resultTypeInfo = resultTypeInfo;
    }

    @Override
    public void deserialize(@Nonnull SourceRecord record, Collector<SeaTunnelRow> out) {
        Struct value = (Struct)record.value();
        Schema valueSchema = record.valueSchema();
        OperationType op = this.operationTypeFor(record);
        BsonDocument documentKey = (BsonDocument)Preconditions.checkNotNull((Object)Objects.requireNonNull(MongodbRecordUtils.extractBsonDocument(value, valueSchema, "documentKey")));
        BsonDocument fullDocument = MongodbRecordUtils.extractBsonDocument(value, valueSchema, "fullDocument");
        String tableId = this.extractTableId(record);
        DeserializationRuntimeConverter tableRowConverter = tableId == null && this.tableRowConverters.size() == 1 ? this.tableRowConverters.values().iterator().next() : this.tableRowConverters.get(tableId);
        if (tableRowConverter == null) {
            log.debug("Ignore newly added table {}", (Object)tableId);
            return;
        }
        switch (op) {
            case INSERT: {
                SeaTunnelRow insert = this.extractRowData(tableRowConverter, fullDocument);
                insert.setRowKind(RowKind.INSERT);
                insert.setTableId(tableId);
                this.emit(record, insert, out);
                break;
            }
            case DELETE: {
                SeaTunnelRow delete = this.extractRowData(tableRowConverter, documentKey);
                delete.setRowKind(RowKind.DELETE);
                delete.setTableId(tableId);
                this.emit(record, delete, out);
                break;
            }
            case UPDATE: {
                if (fullDocument == null) break;
                SeaTunnelRow updateAfter = this.extractRowData(tableRowConverter, fullDocument);
                updateAfter.setRowKind(RowKind.UPDATE_AFTER);
                updateAfter.setTableId(tableId);
                this.emit(record, updateAfter, out);
                break;
            }
            case REPLACE: {
                SeaTunnelRow replaceAfter = this.extractRowData(tableRowConverter, fullDocument);
                replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
                replaceAfter.setTableId(tableId);
                this.emit(record, replaceAfter, out);
                break;
            }
        }
    }

    @Override
    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.resultTypeInfo;
    }

    @Nonnull
    private OperationType operationTypeFor(@Nonnull SourceRecord record) {
        Struct value = (Struct)record.value();
        return OperationType.fromString(value.getString("operationType"));
    }

    private void emit(SourceRecord inRecord, SeaTunnelRow physicalRow, @Nonnull Collector<SeaTunnelRow> collector) {
        collector.collect((Object)physicalRow);
    }

    private SeaTunnelRow extractRowData(DeserializationRuntimeConverter tableRowConverter, BsonDocument document) {
        Preconditions.checkNotNull((Object)document);
        return (SeaTunnelRow)tableRowConverter.convert(document);
    }

    private String extractTableId(SourceRecord record) {
        return null;
    }

    public Map<String, DeserializationRuntimeConverter> createConverter(SeaTunnelDataType<?> inputDataType) {
        HashMap<String, DeserializationRuntimeConverter> tableRowConverters = new HashMap<String, DeserializationRuntimeConverter>();
        for (Map.Entry item : (MultipleRowType)inputDataType) {
            final SerializableFunction<BsonValue, Object> internalRowConverter = MongoDBConnectorDeserializationSchema.createNullSafeInternalConverter((SeaTunnelDataType)item.getValue());
            DeserializationRuntimeConverter itemRowConverter = new DeserializationRuntimeConverter(){
                private static final long serialVersionUID = 1L;

                @Override
                public Object convert(BsonValue bsonValue) {
                    return internalRowConverter.apply(bsonValue);
                }
            };
            tableRowConverters.put((String)item.getKey(), itemRowConverter);
        }
        return tableRowConverters;
    }

    private static SerializableFunction<BsonValue, Object> createNullSafeInternalConverter(SeaTunnelDataType<?> type) {
        return MongoDBConnectorDeserializationSchema.wrapIntoNullSafeInternalConverter(MongoDBConnectorDeserializationSchema.createInternalConverter(type), type);
    }

    private static SerializableFunction<BsonValue, Object> wrapIntoNullSafeInternalConverter(final SerializableFunction<BsonValue, Object> internalConverter, SeaTunnelDataType<?> type) {
        return new SerializableFunction<BsonValue, Object>(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object apply(BsonValue bsonValue) {
                if (MongoDBConnectorDeserializationSchema.isBsonValueNull(bsonValue) || MongoDBConnectorDeserializationSchema.isBsonDecimalNaN(bsonValue)) {
                    return null;
                }
                return internalConverter.apply(bsonValue);
            }
        };
    }

    private static boolean isBsonValueNull(BsonValue bsonValue) {
        return bsonValue == null || bsonValue.isNull() || bsonValue.getBsonType() == BsonType.UNDEFINED;
    }

    private static boolean isBsonDecimalNaN(@Nonnull BsonValue bsonValue) {
        return bsonValue.isDecimal128() && bsonValue.asDecimal128().getValue().isNaN();
    }

    private static SerializableFunction<BsonValue, Object> createInternalConverter(final @Nonnull SeaTunnelDataType<?> type) {
        switch (type.getSqlType()) {
            case NULL: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return null;
                    }
                };
            }
            case BOOLEAN: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return MongoDBConnectorDeserializationSchema.convertToBoolean(bsonValue);
                    }
                };
            }
            case DOUBLE: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return MongoDBConnectorDeserializationSchema.convertToDouble(bsonValue);
                    }
                };
            }
            case INT: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return MongoDBConnectorDeserializationSchema.convertToInt(bsonValue);
                    }
                };
            }
            case BIGINT: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return MongoDBConnectorDeserializationSchema.convertToLong(bsonValue);
                    }
                };
            }
            case BYTES: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return MongoDBConnectorDeserializationSchema.convertToBinary(bsonValue);
                    }
                };
            }
            case STRING: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return MongoDBConnectorDeserializationSchema.convertToString(bsonValue);
                    }
                };
            }
            case DATE: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return MongoDBConnectorDeserializationSchema.convertToLocalDateTime(bsonValue).toLocalDate();
                    }
                };
            }
            case TIMESTAMP: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return MongoDBConnectorDeserializationSchema.convertToLocalDateTime(bsonValue);
                    }
                };
            }
            case DECIMAL: {
                return new SerializableFunction<BsonValue, Object>(){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        DecimalType decimalType = (DecimalType)type;
                        BigDecimal decimalValue = MongoDBConnectorDeserializationSchema.convertToBigDecimal(bsonValue);
                        return MongoDBConnectorDeserializationSchema.fromBigDecimal(decimalValue, decimalType.getPrecision(), decimalType.getScale());
                    }
                };
            }
            case ARRAY: {
                return MongoDBConnectorDeserializationSchema.createArrayConverter((ArrayType)type);
            }
            case MAP: {
                MapType mapType = (MapType)type;
                return MongoDBConnectorDeserializationSchema.createMapConverter(mapType.toString(), mapType.getKeyType(), mapType.getValueType());
            }
            case ROW: {
                return MongoDBConnectorDeserializationSchema.createRowConverter((SeaTunnelRowType)type);
            }
        }
        throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Not support to parse type: " + type);
    }

    private static LocalDateTime convertToLocalDateTime(BsonValue bsonValue) {
        Instant instant;
        if (bsonValue.isTimestamp()) {
            instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getTime());
        } else if (bsonValue.isDateTime()) {
            instant = Instant.ofEpochMilli(bsonValue.asDateTime().getValue());
        } else {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Unable to convert to LocalDateTime from unexpected value '" + bsonValue + "' of type " + (Object)((Object)bsonValue.getBsonType()));
        }
        return Timestamp.from(instant).toLocalDateTime();
    }

    private static SerializableFunction<BsonValue, Object> createRowConverter(SeaTunnelRowType type) {
        SeaTunnelDataType[] fieldTypes = type.getFieldTypes();
        final SerializableFunction[] fieldConverters = (SerializableFunction[])Arrays.stream(fieldTypes).map(MongoDBConnectorDeserializationSchema::createNullSafeInternalConverter).toArray(SerializableFunction[]::new);
        final int fieldCount = type.getTotalFields();
        final String[] fieldNames = type.getFieldNames();
        return new SerializableFunction<BsonValue, Object>(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object apply(BsonValue bsonValue) {
                if (!bsonValue.isDocument()) {
                    throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Unable to convert to rowType from unexpected value '" + bsonValue + "' of type " + (Object)((Object)bsonValue.getBsonType()));
                }
                BsonDocument document = bsonValue.asDocument();
                SeaTunnelRow row = new SeaTunnelRow(fieldCount);
                for (int i = 0; i < fieldCount; ++i) {
                    String fieldName = fieldNames[i];
                    BsonValue fieldValue = document.get(fieldName);
                    Object convertedField = fieldConverters[i].apply(fieldValue);
                    row.setField(i, convertedField);
                }
                return row;
            }
        };
    }

    @Nonnull
    private static SerializableFunction<BsonValue, Object> createArrayConverter(final @Nonnull ArrayType<?, ?> type) {
        final SerializableFunction<BsonValue, Object> elementConverter = MongoDBConnectorDeserializationSchema.createNullSafeInternalConverter(type.getElementType());
        return new SerializableFunction<BsonValue, Object>(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object apply(BsonValue bsonValue) {
                if (!bsonValue.isArray()) {
                    throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Unable to convert to arrayType from unexpected value '" + bsonValue + "' of type " + (Object)((Object)bsonValue.getBsonType()));
                }
                BsonArray in = bsonValue.asArray();
                Object arr = Array.newInstance(type.getElementType().getTypeClass(), in.size());
                for (int i = 0; i < in.size(); ++i) {
                    Array.set(arr, i, elementConverter.apply(in.get(i)));
                }
                return arr;
            }
        };
    }

    @Nonnull
    private static SerializableFunction<BsonValue, Object> createMapConverter(String typeSummary, @Nonnull SeaTunnelDataType<?> keyType, SeaTunnelDataType<?> valueType) {
        if (!keyType.getSqlType().equals((Object)SqlType.STRING)) {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, "Bson format doesn't support non-string as key type of map. The type is: " + typeSummary);
        }
        final SerializableFunction<BsonValue, Object> valueConverter = MongoDBConnectorDeserializationSchema.createNullSafeInternalConverter(valueType);
        return new SerializableFunction<BsonValue, Object>(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object apply(BsonValue bsonValue) {
                if (!bsonValue.isDocument()) {
                    throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Unable to convert to rowType from unexpected value '" + bsonValue + "' of type " + (Object)((Object)bsonValue.getBsonType()));
                }
                BsonDocument document = bsonValue.asDocument();
                HashMap map = new HashMap();
                for (String key : document.keySet()) {
                    map.put(key, valueConverter.apply(document.get(key)));
                }
                return map;
            }
        };
    }

    public static BigDecimal fromBigDecimal(BigDecimal bd, int precision, int scale) {
        if ((bd = bd.setScale(scale, RoundingMode.HALF_UP)).precision() > precision) {
            return null;
        }
        return bd;
    }

    private static boolean convertToBoolean(@Nonnull BsonValue bsonValue) {
        if (bsonValue.isBoolean()) {
            return bsonValue.asBoolean().getValue();
        }
        throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unable to convert to boolean from unexpected value '" + bsonValue + "' of type " + (Object)((Object)bsonValue.getBsonType()));
    }

    private static double convertToDouble(@Nonnull BsonValue bsonValue) {
        if (bsonValue.isDouble()) {
            return bsonValue.asNumber().doubleValue();
        }
        throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unable to convert to double from unexpected value '" + bsonValue + "' of type " + (Object)((Object)bsonValue.getBsonType()));
    }

    private static int convertToInt(@Nonnull BsonValue bsonValue) {
        if (bsonValue.isInt32()) {
            return bsonValue.asNumber().intValue();
        }
        throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unable to convert to integer from unexpected value '" + bsonValue + "' of type " + (Object)((Object)bsonValue.getBsonType()));
    }

    private static String convertToString(@Nonnull BsonValue bsonValue) {
        if (bsonValue.isString()) {
            return bsonValue.asString().getValue();
        }
        if (bsonValue.isObjectId()) {
            return bsonValue.asObjectId().getValue().toHexString();
        }
        if (bsonValue.isDocument()) {
            return bsonValue.asDocument().toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build());
        }
        return new BsonDocument("_value", bsonValue).toJson(MongodbSourceOptions.DEFAULT_JSON_WRITER_SETTINGS);
    }

    private static byte[] convertToBinary(@Nonnull BsonValue bsonValue) {
        if (bsonValue.isBinary()) {
            return bsonValue.asBinary().getData();
        }
        throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported BYTES value type: " + bsonValue.getClass().getSimpleName());
    }

    private static long convertToLong(@Nonnull BsonValue bsonValue) {
        if (bsonValue.isInt64()) {
            return bsonValue.asNumber().longValue();
        }
        throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unable to convert to long from unexpected value '" + bsonValue + "' of type " + (Object)((Object)bsonValue.getBsonType()));
    }

    private static BigDecimal convertToBigDecimal(@Nonnull BsonValue bsonValue) {
        if (bsonValue.isDecimal128()) {
            Decimal128 decimal128Value = bsonValue.asDecimal128().decimal128Value();
            if (decimal128Value.isFinite()) {
                return bsonValue.asDecimal128().decimal128Value().bigDecimalValue();
            }
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Unable to convert infinite bson decimal to Decimal type.");
        }
        throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Unable to convert to decimal from unexpected value '" + bsonValue + "' of type " + (Object)((Object)bsonValue.getBsonType()));
    }

    @FunctionalInterface
    public static interface DeserializationRuntimeConverter
    extends Serializable {
        public Object convert(BsonValue var1);
    }
}

