/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.utils.StringUtils;

public class KafkaActionUtils {
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";

    public static KafkaSource<CdcSourceRecord> buildKafkaSource(Configuration kafkaConfig, KafkaDeserializationSchema<CdcSourceRecord> deserializationSchema) {
        KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder();
        if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
            List topics = ((List)kafkaConfig.get(KafkaConnectorOptions.TOPIC)).stream().flatMap(topic -> Arrays.stream(topic.split(","))).collect(Collectors.toList());
            kafkaSourceBuilder.setTopics(topics);
        } else {
            kafkaSourceBuilder.setTopicPattern(Pattern.compile((String)kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN)));
        }
        kafkaSourceBuilder.setDeserializer(KafkaRecordDeserializationSchema.of(deserializationSchema)).setGroupId(KafkaActionUtils.kafkaPropertiesGroupId(kafkaConfig));
        Properties properties = KafkaActionUtils.createKafkaProperties(kafkaConfig);
        StartupMode startupMode = KafkaActionUtils.fromOption((KafkaConnectorOptions.ScanStartupMode)kafkaConfig.get(KafkaConnectorOptions.SCAN_STARTUP_MODE));
        switch (startupMode) {
            case EARLIEST: {
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
                break;
            }
            case LATEST: {
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
                break;
            }
            case GROUP_OFFSETS: {
                String offsetResetConfig = properties.getProperty("auto.offset.reset", OffsetResetStrategy.NONE.name());
                OffsetResetStrategy offsetResetStrategy = KafkaActionUtils.getResetStrategy(offsetResetConfig);
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets((OffsetResetStrategy)offsetResetStrategy));
                break;
            }
            case SPECIFIC_OFFSETS: {
                HashMap offsets = new HashMap();
                String topic2 = (String)((List)kafkaConfig.get(KafkaConnectorOptions.TOPIC)).get(0);
                String specificOffsetsStrOpt = (String)kafkaConfig.get(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS);
                Map<Integer, Long> offsetMap = KafkaActionUtils.parseSpecificOffsets(specificOffsetsStrOpt, KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key());
                offsetMap.forEach((partition, offset) -> {
                    TopicPartition topicPartition = new TopicPartition(topic2, partition.intValue());
                    offsets.put(topicPartition, offset);
                });
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(offsets));
                break;
            }
            case TIMESTAMP: {
                long startupTimestampMillis = (Long)kafkaConfig.get(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS);
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp((long)startupTimestampMillis));
            }
        }
        kafkaSourceBuilder.setProperties(properties);
        return kafkaSourceBuilder.build();
    }

    private static StartupMode fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) {
        switch (scanStartupMode) {
            case EARLIEST_OFFSET: {
                return StartupMode.EARLIEST;
            }
            case LATEST_OFFSET: {
                return StartupMode.LATEST;
            }
            case GROUP_OFFSETS: {
                return StartupMode.GROUP_OFFSETS;
            }
            case SPECIFIC_OFFSETS: {
                return StartupMode.SPECIFIC_OFFSETS;
            }
            case TIMESTAMP: {
                return StartupMode.TIMESTAMP;
            }
        }
        throw new TableException("Unsupported startup mode. Validator should have checked that.");
    }

    private static OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
        return Arrays.stream(OffsetResetStrategy.values()).filter(ors -> ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT))).findAny().orElseThrow(() -> new IllegalArgumentException(String.format("%s can not be set to %s. Valid values: [%s]", "auto.offset.reset", offsetResetConfig, Arrays.stream(OffsetResetStrategy.values()).map(Enum::name).map(String::toLowerCase).collect(Collectors.joining(",")))));
    }

    private static Map<Integer, Long> parseSpecificOffsets(String specificOffsetsStr, String optionKey) {
        HashMap<Integer, Long> offsetMap = new HashMap<Integer, Long>();
        String[] pairs = specificOffsetsStr.split(";");
        String validationExceptionMessage = String.format("Invalid properties '%s' should follow the format 'partition:0,offset:42;partition:1,offset:300', but is '%s'.", optionKey, specificOffsetsStr);
        if (pairs.length == 0) {
            throw new ValidationException(validationExceptionMessage);
        }
        for (String pair : pairs) {
            if (null == pair || !pair.contains(",")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String[] kv = pair.split(",");
            if (kv.length != 2 || !kv[0].startsWith("partition:") || !kv[1].startsWith("offset:")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
            String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
            try {
                Integer partition = Integer.valueOf(partitionValue);
                Long offset = Long.valueOf(offsetValue);
                offsetMap.put(partition, offset);
            }
            catch (NumberFormatException e) {
                throw new ValidationException(validationExceptionMessage, (Throwable)e);
            }
        }
        return offsetMap;
    }

    private static String kafkaPropertiesGroupId(Configuration kafkaConfig) {
        String groupId = (String)kafkaConfig.get(KafkaConnectorOptions.PROPS_GROUP_ID);
        if (StringUtils.isEmpty((CharSequence)groupId)) {
            groupId = UUID.randomUUID().toString();
            kafkaConfig.set(KafkaConnectorOptions.PROPS_GROUP_ID, (Object)groupId);
        }
        return groupId;
    }

    public static DataFormat getDataFormat(Configuration kafkaConfig) {
        return DataFormatFactory.createDataFormat((String)kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT));
    }

    public static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(Configuration kafkaConfig, KafkaDeserializationSchema<CdcSourceRecord> deserializationSchema) {
        Properties props = KafkaActionUtils.createKafkaProperties(kafkaConfig);
        props.put("bootstrap.servers", kafkaConfig.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS));
        props.put("group.id", KafkaActionUtils.kafkaPropertiesGroupId(kafkaConfig));
        props.put("key.deserializer", ByteArrayDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "false");
        KafkaConsumer consumer = new KafkaConsumer(props);
        String topic = KafkaActionUtils.findOneTopic(kafkaConfig, props);
        List partitionInfos = consumer.partitionsFor(topic);
        if (partitionInfos == null || partitionInfos.isEmpty()) {
            throw new IllegalArgumentException(String.format("Failed to find partition information for topic '%s'. Please check your 'topic' and 'bootstrap.servers' config.", topic));
        }
        int firstPartition = partitionInfos.stream().map(PartitionInfo::partition).sorted().findFirst().get();
        List<TopicPartition> topicPartitions = Collections.singletonList(new TopicPartition(topic, firstPartition));
        consumer.assign(topicPartitions);
        consumer.seekToBeginning(topicPartitions);
        return new KafkaConsumerWrapper((KafkaConsumer<byte[], byte[]>)consumer, topic, deserializationSchema);
    }

    private static Properties createKafkaProperties(Configuration kafkaConfig) {
        Properties props = new Properties();
        props.putAll((Map<?, ?>)OptionsUtils.convertToPropertiesPrefixKey((Map)kafkaConfig.toMap(), (String)PROPERTIES_PREFIX));
        return props;
    }

    public static String findOneTopic(Configuration kafkaConfig) {
        return KafkaActionUtils.findOneTopic(kafkaConfig, KafkaActionUtils.createKafkaProperties(kafkaConfig));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static String findOneTopic(Configuration kafkaConfig, Properties properties) {
        if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
            return (String)((List)kafkaConfig.get(KafkaConnectorOptions.TOPIC)).get(0);
        }
        String pattern = (String)kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN);
        Pattern topicPattern = Pattern.compile(pattern);
        try (AdminClient adminClient = AdminClient.create((Properties)properties);){
            String topicName;
            Set allTopicNames = (Set)adminClient.listTopics().names().get();
            Iterator iterator = allTopicNames.iterator();
            do {
                if (!iterator.hasNext()) throw new RuntimeException("Cannot find topics match the topic-pattern " + pattern);
            } while (!topicPattern.matcher(topicName = (String)iterator.next()).matches());
            String string = topicName;
            return string;
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static class KafkaConsumerWrapper
    implements MessageQueueSchemaUtils.ConsumerWrapper {
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final String topic;
        private final KafkaDeserializationSchema<CdcSourceRecord> deserializationSchema;

        KafkaConsumerWrapper(KafkaConsumer<byte[], byte[]> kafkaConsumer, String topic, KafkaDeserializationSchema<CdcSourceRecord> deserializationSchema) {
            this.consumer = kafkaConsumer;
            this.topic = topic;
            this.deserializationSchema = deserializationSchema;
        }

        @Override
        public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
            ConsumerRecords consumerRecords = this.consumer.poll(Duration.ofMillis(pollTimeOutMills));
            return StreamSupport.stream(consumerRecords.records(this.topic).spliterator(), false).map(consumerRecord -> {
                try {
                    return (CdcSourceRecord)this.deserializationSchema.deserialize(consumerRecord);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).collect(Collectors.toList());
        }

        @Override
        public String topic() {
            return this.topic;
        }

        @Override
        public void close() {
            this.consumer.close();
        }
    }
}

