/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class UpdatedDataFieldsProcessFunctionBase<I, O>
extends ProcessFunction<I, O> {
    private static final Logger LOG = LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
    protected final Catalog.Loader catalogLoader;
    protected Catalog catalog;
    private boolean allowUpperCase;
    private static final List<DataTypeRoot> STRING_TYPES = Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
    private static final List<DataTypeRoot> BINARY_TYPES = Arrays.asList(DataTypeRoot.BINARY, DataTypeRoot.VARBINARY);
    private static final List<DataTypeRoot> INTEGER_TYPES = Arrays.asList(DataTypeRoot.TINYINT, DataTypeRoot.SMALLINT, DataTypeRoot.INTEGER, DataTypeRoot.BIGINT);
    private static final List<DataTypeRoot> FLOATING_POINT_TYPES = Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
    private static final List<DataTypeRoot> DECIMAL_TYPES = Arrays.asList(DataTypeRoot.DECIMAL);
    private static final List<DataTypeRoot> TIMESTAMP_TYPES = Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);

    protected UpdatedDataFieldsProcessFunctionBase(Catalog.Loader catalogLoader) {
        this.catalogLoader = catalogLoader;
    }

    public void open(Configuration parameters) {
        this.catalog = this.catalogLoader.load();
        this.allowUpperCase = this.catalog.allowUpperCase();
    }

    protected void applySchemaChange(SchemaManager schemaManager, SchemaChange schemaChange, Identifier identifier) throws Exception {
        if (schemaChange instanceof SchemaChange.AddColumn) {
            try {
                this.catalog.alterTable(identifier, schemaChange, false);
            }
            catch (Catalog.ColumnAlreadyExistException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to perform SchemaChange.AddColumn {}, possibly due to duplicated column name", (Object)schemaChange, (Object)e);
                }
            }
        } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
            SchemaChange.UpdateColumnType updateColumnType = (SchemaChange.UpdateColumnType)schemaChange;
            TableSchema schema = schemaManager.latest().orElseThrow(() -> new RuntimeException("Table does not exist. This is unexpected."));
            int idx = schema.fieldNames().indexOf(updateColumnType.fieldName());
            Preconditions.checkState(idx >= 0, "Field name " + updateColumnType.fieldName() + " does not exist in table. This is unexpected.");
            DataType oldType = schema.fields().get(idx).type();
            DataType newType = updateColumnType.newDataType();
            switch (UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, newType)) {
                case CONVERT: {
                    this.catalog.alterTable(identifier, schemaChange, false);
                    break;
                }
                case EXCEPTION: {
                    throw new UnsupportedOperationException(String.format("Cannot convert field %s from type %s to %s of Paimon table %s.", updateColumnType.fieldName(), oldType, newType, identifier.getFullName()));
                }
            }
        } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
            this.catalog.alterTable(identifier, schemaChange, false);
        } else {
            throw new UnsupportedOperationException("Unsupported schema change class " + schemaChange.getClass().getName() + ", content " + schemaChange);
        }
    }

    public static ConvertAction canConvert(DataType oldType, DataType newType) {
        if (oldType.equalsIgnoreNullable(newType)) {
            return ConvertAction.CONVERT;
        }
        int oldIdx = STRING_TYPES.indexOf((Object)oldType.getTypeRoot());
        int newIdx = STRING_TYPES.indexOf((Object)newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getLength(oldType) <= DataTypeChecks.getLength(newType) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        oldIdx = BINARY_TYPES.indexOf((Object)oldType.getTypeRoot());
        newIdx = BINARY_TYPES.indexOf((Object)newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getLength(oldType) <= DataTypeChecks.getLength(newType) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        oldIdx = INTEGER_TYPES.indexOf((Object)oldType.getTypeRoot());
        newIdx = INTEGER_TYPES.indexOf((Object)newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        oldIdx = FLOATING_POINT_TYPES.indexOf((Object)oldType.getTypeRoot());
        newIdx = FLOATING_POINT_TYPES.indexOf((Object)newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        oldIdx = DECIMAL_TYPES.indexOf((Object)oldType.getTypeRoot());
        newIdx = DECIMAL_TYPES.indexOf((Object)newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getPrecision(newType) <= DataTypeChecks.getPrecision(oldType) && DataTypeChecks.getScale(newType) <= DataTypeChecks.getScale(oldType) ? ConvertAction.IGNORE : ConvertAction.CONVERT;
        }
        oldIdx = TIMESTAMP_TYPES.indexOf((Object)oldType.getTypeRoot());
        newIdx = TIMESTAMP_TYPES.indexOf((Object)newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getPrecision(oldType) <= DataTypeChecks.getPrecision(newType) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        return ConvertAction.EXCEPTION;
    }

    protected List<SchemaChange> extractSchemaChanges(SchemaManager schemaManager, List<DataField> updatedDataFields) {
        RowType oldRowType = schemaManager.latest().get().logicalRowType();
        HashMap<String, DataField> oldFields = new HashMap<String, DataField>();
        for (DataField oldField : oldRowType.getFields()) {
            oldFields.put(oldField.name(), oldField);
        }
        ArrayList<SchemaChange> result = new ArrayList<SchemaChange>();
        for (DataField newField : updatedDataFields) {
            String newFieldName = StringUtils.caseSensitiveConversion(newField.name(), this.allowUpperCase);
            if (oldFields.containsKey(newFieldName)) {
                DataField oldField = (DataField)oldFields.get(newFieldName);
                if (oldField.type().equalsIgnoreNullable(newField.type())) {
                    if (newField.description() == null || newField.description().equals(oldField.description())) continue;
                    result.add(SchemaChange.updateColumnComment(new String[]{newFieldName}, newField.description()));
                    continue;
                }
                result.add(SchemaChange.updateColumnType(newFieldName, newField.type()));
                if (newField.description() == null) continue;
                result.add(SchemaChange.updateColumnComment(new String[]{newFieldName}, newField.description()));
                continue;
            }
            result.add(SchemaChange.addColumn(newFieldName, newField.type(), newField.description(), null));
        }
        return result;
    }

    public void close() throws Exception {
        if (this.catalog != null) {
            this.catalog.close();
            this.catalog = null;
        }
    }

    public static enum ConvertAction {
        CONVERT,
        IGNORE,
        EXCEPTION;

    }
}

