/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.connector.flink.row;

import com.starrocks.connector.flink.row.BinaryStringDataSerializer;
import com.starrocks.connector.flink.row.DecimalDataSerializer;
import com.starrocks.connector.flink.row.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.StarRocksSinkOP;
import com.starrocks.connector.flink.row.TimestampDataSerializer;
import com.starrocks.shade.com.alibaba.fastjson.JSON;
import com.starrocks.shade.com.alibaba.fastjson.serializer.SerializeConfig;
import java.lang.reflect.Type;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryArrayData;
import org.apache.flink.table.data.binary.BinaryMapData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

public class StarRocksTableRowTransformer
implements StarRocksIRowTransformer<RowData> {
    private static final long serialVersionUID = 1L;
    private TypeInformation<RowData> rowDataTypeInfo;
    private Function<RowData, RowData> valueTransform;
    private DataType[] dataTypes;
    private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd");

    public StarRocksTableRowTransformer(TypeInformation<RowData> rowDataTypeInfo) {
        this.rowDataTypeInfo = rowDataTypeInfo;
    }

    @Override
    public void setTableSchema(TableSchema ts) {
        this.dataTypes = ts.getFieldDataTypes();
    }

    @Override
    public void setRuntimeContext(RuntimeContext runtimeCtx) {
        TypeSerializer typeSerializer = this.rowDataTypeInfo.createSerializer(runtimeCtx.getExecutionConfig());
        this.valueTransform = runtimeCtx.getExecutionConfig().isObjectReuseEnabled() ? arg_0 -> ((TypeSerializer)typeSerializer).copy(arg_0) : Function.identity();
        SerializeConfig.getGlobalInstance().put((Type)((Object)BinaryStringData.class), new BinaryStringDataSerializer());
        SerializeConfig.getGlobalInstance().put((Type)((Object)DecimalData.class), new DecimalDataSerializer());
        SerializeConfig.getGlobalInstance().put((Type)((Object)TimestampData.class), new TimestampDataSerializer());
    }

    @Override
    public Object[] transform(RowData record, boolean supportUpsertDelete) {
        RowData transformRecord = this.valueTransform.apply(record);
        Object[] values = new Object[this.dataTypes.length + (supportUpsertDelete ? 1 : 0)];
        int idx = 0;
        for (DataType dataType : this.dataTypes) {
            values[idx] = this.typeConvertion(dataType.getLogicalType(), transformRecord, idx);
            ++idx;
        }
        if (supportUpsertDelete) {
            values[idx] = StarRocksSinkOP.parse(record.getRowKind()).ordinal();
        }
        return values;
    }

    private Object typeConvertion(LogicalType type, RowData record, int pos) {
        if (record.isNullAt(pos)) {
            return null;
        }
        switch (type.getTypeRoot()) {
            case BOOLEAN: {
                return record.getBoolean(pos) ? 1L : 0L;
            }
            case TINYINT: {
                return record.getByte(pos);
            }
            case SMALLINT: {
                return record.getShort(pos);
            }
            case INTEGER: {
                return record.getInt(pos);
            }
            case BIGINT: {
                return record.getLong(pos);
            }
            case FLOAT: {
                return Float.valueOf(record.getFloat(pos));
            }
            case DOUBLE: {
                return record.getDouble(pos);
            }
            case CHAR: 
            case VARCHAR: {
                return record.getString(pos).toString();
            }
            case DATE: {
                return this.dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos))));
            }
            case TIMESTAMP_WITHOUT_TIME_ZONE: 
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE: {
                int timestampPrecision = ((TimestampType)type).getPrecision();
                return record.getTimestamp(pos, timestampPrecision).toLocalDateTime().toString();
            }
            case DECIMAL: {
                int decimalPrecision = ((DecimalType)type).getPrecision();
                int decimalScale = ((DecimalType)type).getScale();
                return record.getDecimal(pos, decimalPrecision, decimalScale).toBigDecimal();
            }
            case BINARY: {
                byte[] bts = record.getBinary(pos);
                long value = 0L;
                for (int i = 0; i < bts.length; ++i) {
                    value += ((long)bts[bts.length - i - 1] & 0xFFL) << 8 * i;
                }
                return value;
            }
            case ARRAY: {
                return this.convertNestedArray(record.getArray(pos), type);
            }
            case MAP: {
                return this.convertNestedMap(record.getMap(pos), type);
            }
            case ROW: {
                RowType rType = (RowType)type;
                HashMap m = Maps.newHashMap();
                RowData row = record.getRow(pos, rType.getFieldCount());
                rType.getFields().parallelStream().forEach(f -> m.put(f.getName(), this.typeConvertion(f.getType(), row, rType.getFieldIndex(f.getName()))));
                return m;
            }
        }
        throw new UnsupportedOperationException("Unsupported type:" + type);
    }

    private List<Object> convertNestedArray(ArrayData arrData, LogicalType type) {
        if (arrData instanceof GenericArrayData) {
            return Lists.newArrayList((Object[])((GenericArrayData)arrData).toObjectArray());
        }
        if (arrData instanceof BinaryArrayData) {
            LogicalType lt = ((ArrayType)type).getElementType();
            ArrayList data = Lists.newArrayList((Object[])((BinaryArrayData)arrData).toObjectArray(lt));
            if (LogicalTypeRoot.ROW.equals((Object)lt.getTypeRoot())) {
                RowType rType = (RowType)lt;
                return data.parallelStream().map(row -> {
                    HashMap m = Maps.newHashMap();
                    rType.getFields().parallelStream().forEach(f -> m.put(f.getName(), this.typeConvertion(f.getType(), (RowData)row, rType.getFieldIndex(f.getName()))));
                    return JSON.toJSONString(m);
                }).collect(Collectors.toList());
            }
            if (LogicalTypeRoot.MAP.equals((Object)lt.getTypeRoot())) {
                return data.parallelStream().map(m -> this.convertNestedMap((MapData)m, lt)).collect(Collectors.toList());
            }
            if (LogicalTypeRoot.DATE.equals((Object)lt.getTypeRoot())) {
                return data.parallelStream().map(date -> this.dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(((Integer)date).intValue())))).collect(Collectors.toList());
            }
            if (LogicalTypeRoot.ARRAY.equals((Object)lt.getTypeRoot())) {
                return data.parallelStream().map(arr -> this.convertNestedArray((ArrayData)arr, lt)).collect(Collectors.toList());
            }
            return data;
        }
        throw new UnsupportedOperationException(String.format("Unsupported array data: %s", arrData.getClass()));
    }

    private Map<Object, Object> convertNestedMap(MapData mapData, LogicalType type) {
        if (mapData instanceof GenericMapData) {
            HashMap m = Maps.newHashMap();
            for (Object k : ((GenericArrayData)((GenericMapData)mapData).keyArray()).toObjectArray()) {
                m.put(k, ((GenericMapData)mapData).get(k));
            }
            return m;
        }
        if (mapData instanceof BinaryMapData) {
            HashMap result = Maps.newHashMap();
            LogicalType valType = ((MapType)type).getValueType();
            Map javaMap = ((BinaryMapData)mapData).toJavaMap(((MapType)type).getKeyType(), valType);
            for (Map.Entry en : javaMap.entrySet()) {
                if (LogicalTypeRoot.MAP.equals((Object)valType.getTypeRoot())) {
                    result.put(en.getKey().toString(), this.convertNestedMap((MapData)en.getValue(), valType));
                    continue;
                }
                if (LogicalTypeRoot.DATE.equals((Object)valType.getTypeRoot())) {
                    result.put(en.getKey().toString(), this.dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(((Integer)en.getValue()).intValue()))));
                    continue;
                }
                if (LogicalTypeRoot.ARRAY.equals((Object)valType.getTypeRoot())) {
                    result.put(en.getKey().toString(), this.convertNestedArray((ArrayData)en.getValue(), valType));
                    continue;
                }
                result.put(en.getKey().toString(), en.getValue());
            }
            return result;
        }
        throw new UnsupportedOperationException(String.format("Unsupported map data: %s", mapData.getClass()));
    }
}

