/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.connect.data;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.data.SchemaUpdate;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SchemaUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class);
    private static final Pattern TRANSFORM_REGEX = Pattern.compile("(\\w+)\\((.+)\\)");

    static Type.PrimitiveType needsDataTypeUpdate(Type currentIcebergType, org.apache.kafka.connect.data.Schema valueSchema) {
        if (currentIcebergType.typeId() == Type.TypeID.FLOAT && valueSchema.type() == Schema.Type.FLOAT64) {
            return Types.DoubleType.get();
        }
        if (currentIcebergType.typeId() == Type.TypeID.INTEGER && valueSchema.type() == Schema.Type.INT64) {
            return Types.LongType.get();
        }
        return null;
    }

    static void applySchemaUpdates(Table table, SchemaUpdate.Consumer updates) {
        if (updates == null || updates.empty()) {
            return;
        }
        Tasks.range((int)1).retry(2).run(notUsed -> SchemaUtils.commitSchemaUpdates(table, updates));
    }

    private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updates) {
        table.refresh();
        List<SchemaUpdate.AddColumn> addColumns = updates.addColumns().stream().filter(addCol -> !SchemaUtils.columnExists(table.schema(), addCol)).collect(Collectors.toList());
        List<SchemaUpdate.UpdateType> updateTypes = updates.updateTypes().stream().filter(updateType -> !SchemaUtils.typeMatches(table.schema(), updateType)).collect(Collectors.toList());
        List<SchemaUpdate.MakeOptional> makeOptionals = updates.makeOptionals().stream().filter(makeOptional -> !SchemaUtils.isOptional(table.schema(), makeOptional)).collect(Collectors.toList());
        if (addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty()) {
            LOG.info("Schema for table {} already up-to-date", (Object)table.name());
            return;
        }
        UpdateSchema updateSchema = table.updateSchema();
        addColumns.forEach(update -> updateSchema.addColumn(update.parentName(), update.name(), update.type()));
        updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), update.type()));
        makeOptionals.forEach(update -> updateSchema.makeColumnOptional(update.name()));
        updateSchema.commit();
        LOG.info("Schema for table {} updated with new columns", (Object)table.name());
    }

    private static boolean columnExists(Schema schema, SchemaUpdate.AddColumn update) {
        return schema.findType(update.key()) != null;
    }

    private static boolean typeMatches(Schema schema, SchemaUpdate.UpdateType update) {
        Type type = schema.findType(update.name());
        if (type == null) {
            throw new IllegalArgumentException("Invalid column: " + update.name());
        }
        return type.typeId() == update.type().typeId();
    }

    private static boolean isOptional(Schema schema, SchemaUpdate.MakeOptional update) {
        Types.NestedField field = schema.findField(update.name());
        if (field == null) {
            throw new IllegalArgumentException("Invalid column: " + update.name());
        }
        return field.isOptional();
    }

    static PartitionSpec createPartitionSpec(Schema schema, List<String> partitionBy) {
        if (partitionBy.isEmpty()) {
            return PartitionSpec.unpartitioned();
        }
        PartitionSpec.Builder specBuilder = PartitionSpec.builderFor((Schema)schema);
        partitionBy.forEach(partitionField -> {
            Matcher matcher = TRANSFORM_REGEX.matcher((CharSequence)partitionField);
            if (matcher.matches()) {
                String transform;
                switch (transform = matcher.group(1)) {
                    case "year": 
                    case "years": {
                        specBuilder.year(matcher.group(2));
                        break;
                    }
                    case "month": 
                    case "months": {
                        specBuilder.month(matcher.group(2));
                        break;
                    }
                    case "day": 
                    case "days": {
                        specBuilder.day(matcher.group(2));
                        break;
                    }
                    case "hour": 
                    case "hours": {
                        specBuilder.hour(matcher.group(2));
                        break;
                    }
                    case "bucket": {
                        Pair<String, Integer> args = SchemaUtils.transformArgPair(matcher.group(2));
                        specBuilder.bucket((String)args.first(), ((Integer)args.second()).intValue());
                        break;
                    }
                    case "truncate": {
                        Pair<String, Integer> args = SchemaUtils.transformArgPair(matcher.group(2));
                        specBuilder.truncate((String)args.first(), ((Integer)args.second()).intValue());
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException("Unsupported transform: " + transform);
                    }
                }
            } else {
                specBuilder.identity(partitionField);
            }
        });
        return specBuilder.build();
    }

    private static Pair<String, Integer> transformArgPair(String argsStr) {
        List parts = Splitter.on((char)',').splitToList((CharSequence)argsStr);
        if (parts.size() != 2) {
            throw new IllegalArgumentException("Invalid argument " + argsStr + ", should have 2 parts");
        }
        return Pair.of((Object)((String)parts.get(0)).trim(), (Object)Integer.parseInt(((String)parts.get(1)).trim()));
    }

    static Type toIcebergType(org.apache.kafka.connect.data.Schema valueSchema, IcebergSinkConfig config) {
        return new SchemaGenerator(config).toIcebergType(valueSchema);
    }

    static Type inferIcebergType(Object value, IcebergSinkConfig config) {
        return new SchemaGenerator(config).inferIcebergType(value);
    }

    private SchemaUtils() {
    }

    static class SchemaGenerator {
        private int fieldId = 1;
        private final IcebergSinkConfig config;

        SchemaGenerator(IcebergSinkConfig config) {
            this.config = config;
        }

        Type toIcebergType(org.apache.kafka.connect.data.Schema valueSchema) {
            switch (valueSchema.type()) {
                case BOOLEAN: {
                    return Types.BooleanType.get();
                }
                case BYTES: {
                    if ("org.apache.kafka.connect.data.Decimal".equals(valueSchema.name())) {
                        int scale = Integer.parseInt((String)valueSchema.parameters().get("scale"));
                        return Types.DecimalType.of((int)38, (int)scale);
                    }
                    return Types.BinaryType.get();
                }
                case INT8: 
                case INT16: {
                    return Types.IntegerType.get();
                }
                case INT32: {
                    if ("org.apache.kafka.connect.data.Date".equals(valueSchema.name())) {
                        return Types.DateType.get();
                    }
                    if ("org.apache.kafka.connect.data.Time".equals(valueSchema.name())) {
                        return Types.TimeType.get();
                    }
                    return Types.IntegerType.get();
                }
                case INT64: {
                    if ("org.apache.kafka.connect.data.Timestamp".equals(valueSchema.name())) {
                        return Types.TimestampType.withZone();
                    }
                    return Types.LongType.get();
                }
                case FLOAT32: {
                    return Types.FloatType.get();
                }
                case FLOAT64: {
                    return Types.DoubleType.get();
                }
                case ARRAY: {
                    Type elementType = this.toIcebergType(valueSchema.valueSchema());
                    if (this.config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) {
                        return Types.ListType.ofOptional((int)this.nextId(), (Type)elementType);
                    }
                    return Types.ListType.ofRequired((int)this.nextId(), (Type)elementType);
                }
                case MAP: {
                    Type keyType = this.toIcebergType(valueSchema.keySchema());
                    Type valueType = this.toIcebergType(valueSchema.valueSchema());
                    if (this.config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) {
                        return Types.MapType.ofOptional((int)this.nextId(), (int)this.nextId(), (Type)keyType, (Type)valueType);
                    }
                    return Types.MapType.ofRequired((int)this.nextId(), (int)this.nextId(), (Type)keyType, (Type)valueType);
                }
                case STRUCT: {
                    List structFields = valueSchema.fields().stream().map(field -> Types.NestedField.builder().isOptional(this.config.schemaForceOptional() || field.schema().isOptional()).withId(this.nextId()).ofType(this.toIcebergType(field.schema())).withName(field.name()).build()).collect(Collectors.toList());
                    return Types.StructType.of(structFields);
                }
            }
            return Types.StringType.get();
        }

        Type inferIcebergType(Object value) {
            if (value == null) {
                return null;
            }
            if (value instanceof String) {
                return Types.StringType.get();
            }
            if (value instanceof Boolean) {
                return Types.BooleanType.get();
            }
            if (value instanceof BigDecimal) {
                BigDecimal bigDecimal = (BigDecimal)value;
                return Types.DecimalType.of((int)bigDecimal.precision(), (int)bigDecimal.scale());
            }
            if (value instanceof Integer || value instanceof Long) {
                return Types.LongType.get();
            }
            if (value instanceof Float || value instanceof Double) {
                return Types.DoubleType.get();
            }
            if (value instanceof LocalDate) {
                return Types.DateType.get();
            }
            if (value instanceof LocalTime) {
                return Types.TimeType.get();
            }
            if (value instanceof Date || value instanceof OffsetDateTime) {
                return Types.TimestampType.withZone();
            }
            if (value instanceof LocalDateTime) {
                return Types.TimestampType.withoutZone();
            }
            if (value instanceof List) {
                List list = (List)value;
                if (list.isEmpty()) {
                    return null;
                }
                Type elementType = this.inferIcebergType(list.get(0));
                return elementType == null ? null : Types.ListType.ofOptional((int)this.nextId(), (Type)elementType);
            }
            if (value instanceof Map) {
                Map map = (Map)value;
                List structFields = map.entrySet().stream().filter(entry -> entry.getKey() != null && entry.getValue() != null).map(entry -> {
                    Type valueType = this.inferIcebergType(entry.getValue());
                    return valueType == null ? null : Types.NestedField.optional((int)this.nextId(), (String)entry.getKey().toString(), (Type)valueType);
                }).filter(Objects::nonNull).collect(Collectors.toList());
                if (structFields.isEmpty()) {
                    return null;
                }
                return Types.StructType.of(structFields);
            }
            return null;
        }

        private int nextId() {
            return this.fieldId++;
        }
    }
}

