/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.CollectionUtil;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunctionBase;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

class KafkaActionUtils {
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";

    KafkaActionUtils() {
    }

    static void assertSchemaCompatible(TableSchema tableSchema, Schema kafkaCanalSchema) {
        if (!KafkaActionUtils.schemaCompatible(tableSchema, kafkaCanalSchema)) {
            throw new IllegalArgumentException("Paimon schema and Kafka schema are not compatible.\nPaimon fields are: " + tableSchema.fields() + ".\nKafka fields are: " + kafkaCanalSchema.fields());
        }
    }

    static boolean schemaCompatible(TableSchema paimonSchema, Schema kafkaCanalSchema) {
        for (DataField field : kafkaCanalSchema.fields()) {
            int idx = paimonSchema.fieldNames().indexOf(field.name());
            if (idx < 0) {
                return false;
            }
            DataType type = paimonSchema.fields().get(idx).type();
            if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type) == UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT) continue;
            return false;
        }
        return true;
    }

    static Schema buildPaimonSchema(KafkaSchema kafkaSchema, List<String> specifiedPartitionKeys, List<String> specifiedPrimaryKeys, List<ComputedColumn> computedColumns, Map<String, String> paimonConfig, boolean caseSensitive) {
        List<String> mySqlPrimaryKeys;
        Map<String, DataType> mySqlFields;
        Schema.Builder builder = Schema.newBuilder();
        builder.options(paimonConfig);
        if (caseSensitive) {
            mySqlFields = kafkaSchema.fields();
            mySqlPrimaryKeys = kafkaSchema.primaryKeys();
        } else {
            mySqlFields = new LinkedHashMap<String, DataType>();
            for (Map.Entry<String, DataType> entry : kafkaSchema.fields().entrySet()) {
                String fieldName = entry.getKey();
                Preconditions.checkArgument(!mySqlFields.containsKey(fieldName.toLowerCase()), String.format("Duplicate key '%s' in table '%s' appears when converting fields map keys to case-insensitive form.", fieldName, kafkaSchema.tableName()));
                mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
            }
            mySqlPrimaryKeys = kafkaSchema.primaryKeys().stream().map(String::toLowerCase).collect(Collectors.toList());
        }
        for (Map.Entry<String, DataType> entry : mySqlFields.entrySet()) {
            builder.column(entry.getKey(), entry.getValue(), null);
        }
        for (ComputedColumn computedColumn : computedColumns) {
            builder.column(computedColumn.columnName(), computedColumn.columnType());
        }
        if (specifiedPrimaryKeys.size() > 0) {
            for (String key : specifiedPrimaryKeys) {
                if (mySqlFields.containsKey(key) || !computedColumns.stream().noneMatch(c -> c.columnName().equals(key))) continue;
                throw new IllegalArgumentException("Specified primary key " + key + " does not exist in kafka topic's table or computed columns.");
            }
            builder.primaryKey(specifiedPrimaryKeys);
        } else if (mySqlPrimaryKeys.size() > 0) {
            builder.primaryKey(mySqlPrimaryKeys);
        } else {
            throw new IllegalArgumentException("Primary keys are not specified. Also, can't infer primary keys from kafka topic's table schemas because Kafka topic's table have no primary keys or have different primary keys.");
        }
        if (specifiedPartitionKeys.size() > 0) {
            builder.partitionKeys(specifiedPartitionKeys);
        }
        return builder.build();
    }

    static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
        KafkaActionUtils.validateKafkaConfig(kafkaConfig);
        KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder();
        kafkaSourceBuilder.setTopics((List)kafkaConfig.get(KafkaConnectorOptions.TOPIC)).setValueOnlyDeserializer((DeserializationSchema)new SimpleStringSchema()).setGroupId(KafkaActionUtils.kafkaPropertiesGroupId(kafkaConfig));
        Properties properties = new Properties();
        for (Map.Entry entry : kafkaConfig.toMap().entrySet()) {
            String key = (String)entry.getKey();
            String value = (String)entry.getValue();
            if (!key.startsWith(PROPERTIES_PREFIX)) continue;
            properties.put(key.substring(PROPERTIES_PREFIX.length()), value);
        }
        StartupMode startupMode = KafkaActionUtils.fromOption((KafkaConnectorOptions.ScanStartupMode)kafkaConfig.get(KafkaConnectorOptions.SCAN_STARTUP_MODE));
        switch (startupMode) {
            case EARLIEST: {
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
                break;
            }
            case LATEST: {
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
                break;
            }
            case GROUP_OFFSETS: {
                String offsetResetConfig = properties.getProperty("auto.offset.reset", OffsetResetStrategy.NONE.name());
                OffsetResetStrategy offsetResetStrategy = KafkaActionUtils.getResetStrategy(offsetResetConfig);
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets((OffsetResetStrategy)offsetResetStrategy));
                break;
            }
            case SPECIFIC_OFFSETS: {
                HashMap offsets = new HashMap();
                String topic = (String)((List)kafkaConfig.get(KafkaConnectorOptions.TOPIC)).get(0);
                String specificOffsetsStrOpt = (String)kafkaConfig.get(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS);
                Map<Integer, Long> offsetMap = KafkaActionUtils.parseSpecificOffsets(specificOffsetsStrOpt, KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key());
                offsetMap.forEach((partition, offset) -> {
                    TopicPartition topicPartition = new TopicPartition(topic, partition.intValue());
                    offsets.put(topicPartition, offset);
                });
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(offsets));
                break;
            }
            case TIMESTAMP: {
                long startupTimestampMillis = (Long)kafkaConfig.get(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS);
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp((long)startupTimestampMillis));
            }
        }
        kafkaSourceBuilder.setProperties(properties);
        return kafkaSourceBuilder.build();
    }

    private static StartupMode fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) {
        switch (scanStartupMode) {
            case EARLIEST_OFFSET: {
                return StartupMode.EARLIEST;
            }
            case LATEST_OFFSET: {
                return StartupMode.LATEST;
            }
            case GROUP_OFFSETS: {
                return StartupMode.GROUP_OFFSETS;
            }
            case SPECIFIC_OFFSETS: {
                return StartupMode.SPECIFIC_OFFSETS;
            }
            case TIMESTAMP: {
                return StartupMode.TIMESTAMP;
            }
        }
        throw new TableException("Unsupported startup mode. Validator should have checked that.");
    }

    private static OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
        return Arrays.stream(OffsetResetStrategy.values()).filter(ors -> ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT))).findAny().orElseThrow(() -> new IllegalArgumentException(String.format("%s can not be set to %s. Valid values: [%s]", "auto.offset.reset", offsetResetConfig, Arrays.stream(OffsetResetStrategy.values()).map(Enum::name).map(String::toLowerCase).collect(Collectors.joining(",")))));
    }

    public static Map<Integer, Long> parseSpecificOffsets(String specificOffsetsStr, String optionKey) {
        HashMap<Integer, Long> offsetMap = new HashMap<Integer, Long>();
        String[] pairs = specificOffsetsStr.split(";");
        String validationExceptionMessage = String.format("Invalid properties '%s' should follow the format 'partition:0,offset:42;partition:1,offset:300', but is '%s'.", optionKey, specificOffsetsStr);
        if (pairs.length == 0) {
            throw new ValidationException(validationExceptionMessage);
        }
        for (String pair : pairs) {
            if (null == pair || !pair.contains(",")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String[] kv = pair.split(",");
            if (kv.length != 2 || !kv[0].startsWith("partition:") || !kv[1].startsWith("offset:")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
            String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
            try {
                Integer partition = Integer.valueOf(partitionValue);
                Long offset = Long.valueOf(offsetValue);
                offsetMap.put(partition, offset);
            }
            catch (NumberFormatException e) {
                throw new ValidationException(validationExceptionMessage, (Throwable)e);
            }
        }
        return offsetMap;
    }

    private static void validateKafkaConfig(Configuration kafkaConfig) {
        Preconditions.checkArgument(kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT) != null, String.format("kafka-conf [%s] must be specified.", KafkaConnectorOptions.VALUE_FORMAT.key()));
        Preconditions.checkArgument(!CollectionUtil.isNullOrEmpty((Collection)((Collection)kafkaConfig.get(KafkaConnectorOptions.TOPIC))), String.format("kafka-conf [%s] must be specified.", KafkaConnectorOptions.TOPIC.key()));
        Preconditions.checkArgument(kafkaConfig.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS) != null, String.format("kafka-conf [%s] must be specified.", KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS.key()));
    }

    public static String kafkaPropertiesGroupId(Configuration kafkaConfig) {
        String groupId = (String)kafkaConfig.get(KafkaConnectorOptions.PROPS_GROUP_ID);
        if (StringUtils.isEmpty(groupId)) {
            groupId = UUID.randomUUID().toString();
            kafkaConfig.set(KafkaConnectorOptions.PROPS_GROUP_ID, (Object)groupId);
        }
        return groupId;
    }
}

