/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.schema.NestedSchemaUtils;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.FieldIdentifier;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class UpdatedDataFieldsProcessFunctionBase<I, O>
extends ProcessFunction<I, O> {
    private static final Logger LOG = LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
    protected final CatalogLoader catalogLoader;
    private final TypeMapping typeMapping;
    protected Catalog catalog;
    private boolean caseSensitive;
    private static final List<DataTypeRoot> STRING_TYPES = Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
    private static final List<DataTypeRoot> BINARY_TYPES = Arrays.asList(DataTypeRoot.BINARY, DataTypeRoot.VARBINARY);
    private static final List<DataTypeRoot> INTEGER_TYPES = Arrays.asList(DataTypeRoot.TINYINT, DataTypeRoot.SMALLINT, DataTypeRoot.INTEGER, DataTypeRoot.BIGINT);
    private static final List<DataTypeRoot> FLOATING_POINT_TYPES = Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
    private static final List<DataTypeRoot> DECIMAL_TYPES = Arrays.asList(DataTypeRoot.DECIMAL);
    private static final List<DataTypeRoot> TIMESTAMP_TYPES = Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);

    protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader catalogLoader, TypeMapping typeMapping) {
        this.catalogLoader = catalogLoader;
        this.typeMapping = typeMapping;
    }

    public void open(OpenContext openContext) throws Exception {
        this.open(new Configuration());
    }

    public void open(Configuration parameters) {
        this.catalog = this.catalogLoader.load();
        this.caseSensitive = this.catalog.caseSensitive();
    }

    protected void applySchemaChange(SchemaManager schemaManager, SchemaChange schemaChange, Identifier identifier, CdcSchema newSchema) throws Exception {
        if (schemaChange instanceof SchemaChange.AddColumn) {
            try {
                this.catalog.alterTable(identifier, schemaChange, false);
            }
            catch (Catalog.ColumnAlreadyExistException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to perform SchemaChange.AddColumn {}, possibly due to duplicated column name", (Object)schemaChange, (Object)e);
                }
            }
        } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
            SchemaChange.UpdateColumnType updateColumnType = (SchemaChange.UpdateColumnType)schemaChange;
            String topLevelFieldName = updateColumnType.fieldNames()[0];
            TableSchema oldSchema = schemaManager.latestOrThrow("Table does not exist. This is unexpected.");
            DataType oldTopLevelFieldType = this.findTopLevelType(oldSchema.fields(), topLevelFieldName);
            DataType newTopLevelFieldType = this.findTopLevelType(newSchema.fields(), topLevelFieldName);
            switch (UpdatedDataFieldsProcessFunctionBase.canConvert(oldTopLevelFieldType, newTopLevelFieldType, this.typeMapping)) {
                case CONVERT: {
                    this.catalog.alterTable(identifier, schemaChange, false);
                    break;
                }
                case EXCEPTION: {
                    throw new UnsupportedOperationException(String.format("Cannot convert field %s from type %s to %s of Paimon table %s.", topLevelFieldName, oldTopLevelFieldType, newTopLevelFieldType, identifier.getFullName()));
                }
            }
        } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
            this.catalog.alterTable(identifier, schemaChange, false);
        } else if (schemaChange instanceof SchemaChange.UpdateComment) {
            this.catalog.alterTable(identifier, schemaChange, false);
        } else {
            throw new UnsupportedOperationException("Unsupported schema change class " + schemaChange.getClass().getName() + ", content " + schemaChange);
        }
    }

    private DataType findTopLevelType(List<DataField> fields, String name) {
        for (DataField field : fields) {
            if (!(this.caseSensitive ? name.equals(field.name()) : name.equalsIgnoreCase(field.name()))) continue;
            return field.type();
        }
        throw new RuntimeException("Cannot find top level type " + name);
    }

    public static ConvertAction canConvert(DataType oldType, DataType newType, TypeMapping typeMapping) {
        if (oldType.equalsIgnoreNullable(newType)) {
            if (oldType.isNullable() && !newType.isNullable()) {
                return ConvertAction.EXCEPTION;
            }
            return ConvertAction.CONVERT;
        }
        if (oldType.getTypeRoot() == DataTypeRoot.ARRAY && newType.getTypeRoot() == DataTypeRoot.ARRAY) {
            ArrayType oldArrayType = (ArrayType)oldType;
            ArrayType newArrayType = (ArrayType)newType;
            return UpdatedDataFieldsProcessFunctionBase.canConvertArray(oldArrayType, newArrayType, typeMapping);
        }
        if (oldType.getTypeRoot() == DataTypeRoot.MAP && newType.getTypeRoot() == DataTypeRoot.MAP) {
            MapType oldMapType = (MapType)oldType;
            MapType newMapType = (MapType)newType;
            return UpdatedDataFieldsProcessFunctionBase.canConvertMap(oldMapType, newMapType, typeMapping);
        }
        if (oldType.getTypeRoot() == DataTypeRoot.MULTISET && newType.getTypeRoot() == DataTypeRoot.MULTISET) {
            MultisetType oldMultisetType = (MultisetType)oldType;
            MultisetType newMultisetType = (MultisetType)newType;
            return UpdatedDataFieldsProcessFunctionBase.canConvertMultisetType(oldMultisetType, newMultisetType, typeMapping);
        }
        if (oldType.getTypeRoot() == DataTypeRoot.ROW && newType.getTypeRoot() == DataTypeRoot.ROW) {
            return UpdatedDataFieldsProcessFunctionBase.canConvertRowType((RowType)oldType, (RowType)newType, typeMapping);
        }
        int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
        int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getLength((DataType)oldType) <= DataTypeChecks.getLength((DataType)newType) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        if (oldIdx < 0 && newIdx >= 0 && typeMapping.containsMode(TypeMapping.TypeMappingMode.ALLOW_NON_STRING_TO_STRING)) {
            return ConvertAction.CONVERT;
        }
        oldIdx = BINARY_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = BINARY_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getLength((DataType)oldType) <= DataTypeChecks.getLength((DataType)newType) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = INTEGER_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        oldIdx = FLOATING_POINT_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = FLOATING_POINT_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        oldIdx = DECIMAL_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = DECIMAL_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getPrecision((DataType)newType) <= DataTypeChecks.getPrecision((DataType)oldType) && DataTypeChecks.getScale((DataType)newType) <= DataTypeChecks.getScale((DataType)oldType) ? ConvertAction.IGNORE : ConvertAction.CONVERT;
        }
        oldIdx = TIMESTAMP_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = TIMESTAMP_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getPrecision((DataType)oldType) <= DataTypeChecks.getPrecision((DataType)newType) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        return ConvertAction.EXCEPTION;
    }

    private static ConvertAction canConvertArray(ArrayType oldArrayType, ArrayType newArrayType, TypeMapping typeMapping) {
        if (oldArrayType.isNullable() && !newArrayType.isNullable()) {
            return ConvertAction.EXCEPTION;
        }
        return UpdatedDataFieldsProcessFunctionBase.canConvert(oldArrayType.getElementType(), newArrayType.getElementType(), typeMapping);
    }

    private static ConvertAction canConvertMap(MapType oldMapType, MapType newMapType, TypeMapping typeMapping) {
        if (oldMapType.isNullable() && !newMapType.isNullable()) {
            return ConvertAction.EXCEPTION;
        }
        if (!oldMapType.getKeyType().equals((Object)newMapType.getKeyType())) {
            return ConvertAction.EXCEPTION;
        }
        return UpdatedDataFieldsProcessFunctionBase.canConvert(oldMapType.getValueType(), newMapType.getValueType(), typeMapping);
    }

    private static ConvertAction canConvertRowType(RowType oldRowType, RowType newRowType, TypeMapping typeMapping) {
        HashMap<String, DataField> oldFieldMap = new HashMap<String, DataField>();
        for (Object field : oldRowType.getFields()) {
            oldFieldMap.put(field.name(), (DataField)field);
        }
        HashMap<String, DataField> newFieldMap = new HashMap<String, DataField>();
        for (DataField field : newRowType.getFields()) {
            newFieldMap.put(field.name(), field);
        }
        for (DataField oldField : oldRowType.getFields()) {
            if (oldField.type().isNullable() || newFieldMap.containsKey(oldField.name())) continue;
            return ConvertAction.EXCEPTION;
        }
        boolean needsConversion = false;
        for (DataField newField : newRowType.getFields()) {
            DataField oldField = (DataField)oldFieldMap.get(newField.name());
            if (oldField != null) {
                ConvertAction fieldAction = UpdatedDataFieldsProcessFunctionBase.canConvert(oldField.type(), newField.type(), typeMapping);
                if (fieldAction == ConvertAction.EXCEPTION) {
                    return ConvertAction.EXCEPTION;
                }
                if (fieldAction != ConvertAction.CONVERT) continue;
                needsConversion = true;
                continue;
            }
            if (!newField.type().isNullable()) {
                return ConvertAction.EXCEPTION;
            }
            needsConversion = true;
        }
        return needsConversion ? ConvertAction.CONVERT : ConvertAction.IGNORE;
    }

    private static ConvertAction canConvertMultisetType(MultisetType oldMultisetType, MultisetType newMultisetType, TypeMapping typeMapping) {
        if (oldMultisetType.isNullable() && !newMultisetType.isNullable()) {
            return ConvertAction.EXCEPTION;
        }
        return UpdatedDataFieldsProcessFunctionBase.canConvert(oldMultisetType.getElementType(), newMultisetType.getElementType(), typeMapping);
    }

    protected List<SchemaChange> extractSchemaChanges(SchemaManager schemaManager, CdcSchema updatedSchema) {
        TableSchema oldTableSchema = (TableSchema)schemaManager.latest().get();
        RowType oldRowType = oldTableSchema.logicalRowType();
        HashMap<String, DataField> oldFields = new HashMap<String, DataField>();
        for (DataField oldField : oldRowType.getFields()) {
            oldFields.put(oldField.name(), oldField);
        }
        boolean allowDecimalTypeChange = this.typeMapping == null || !this.typeMapping.containsMode(TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE);
        ArrayList<SchemaChange> result = new ArrayList<SchemaChange>();
        for (DataField newField : updatedSchema.fields()) {
            String newFieldName = StringUtils.toLowerCaseIfNeed((String)newField.name(), (boolean)this.caseSensitive);
            if (oldFields.containsKey(newFieldName)) {
                DataField oldField = (DataField)oldFields.get(newFieldName);
                if (oldField.type().copy(true).equalsIgnoreFieldId(newField.type().copy(true))) {
                    if (newField.description() == null || newField.description().equals(oldField.description())) continue;
                    result.add(SchemaChange.updateColumnComment((String[])new String[]{newFieldName}, (String)newField.description()));
                    continue;
                }
                if (oldField.type().is(DataTypeRoot.DECIMAL) && !allowDecimalTypeChange) continue;
                NestedSchemaUtils.generateNestedColumnUpdates(Collections.singletonList(newFieldName), (DataType)oldField.type(), (DataType)newField.type(), result);
                if (newField.description() == null) continue;
                result.add(SchemaChange.updateColumnComment((String[])new String[]{newFieldName}, (String)newField.description()));
                continue;
            }
            result.add(SchemaChange.addColumn((String)newFieldName, (DataType)newField.type(), (String)newField.description(), null));
        }
        if (updatedSchema.comment() != null && !updatedSchema.comment().equals(oldTableSchema.comment())) {
            result.add(SchemaChange.updateComment((String)updatedSchema.comment()));
        }
        return result;
    }

    protected List<DataField> actualUpdatedDataFields(List<DataField> newFields, Set<FieldIdentifier> latestFields) {
        return newFields.stream().filter(dataField -> !latestFields.contains(new FieldIdentifier(dataField))).collect(Collectors.toList());
    }

    protected Set<FieldIdentifier> updateLatestFields(SchemaManager schemaManager) {
        RowType oldRowType = ((TableSchema)schemaManager.latest().get()).logicalRowType();
        return oldRowType.getFields().stream().map(FieldIdentifier::new).collect(Collectors.toSet());
    }

    public void close() throws Exception {
        if (this.catalog != null) {
            this.catalog.close();
            this.catalog = null;
        }
    }

    public static enum ConvertAction {
        CONVERT,
        IGNORE,
        EXCEPTION;

    }
}

