/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.kafka.KafkaLogDeserializationSchema;
import org.apache.paimon.flink.log.LogSourceProvider;

public class KafkaLogSourceProvider
implements LogSourceProvider {
    private static final long serialVersionUID = 1L;
    private final String topic;
    private final Properties properties;
    private final DataType physicalType;
    private final int[] primaryKey;
    @Nullable
    private final DeserializationSchema<RowData> primaryKeyDeserializer;
    private final DeserializationSchema<RowData> valueDeserializer;
    @Nullable
    private final int[][] projectFields;
    private final CoreOptions.LogConsistency consistency;
    private final CoreOptions.StartupMode scanMode;
    @Nullable
    private final Long timestampMills;

    public KafkaLogSourceProvider(String topic, Properties properties, DataType physicalType, int[] primaryKey, @Nullable DeserializationSchema<RowData> primaryKeyDeserializer, DeserializationSchema<RowData> valueDeserializer, @Nullable int[][] projectFields, CoreOptions.LogConsistency consistency, CoreOptions.StartupMode scanMode, @Nullable Long timestampMills) {
        this.topic = topic;
        this.properties = properties;
        this.physicalType = physicalType;
        this.primaryKey = primaryKey;
        this.primaryKeyDeserializer = primaryKeyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.projectFields = projectFields;
        this.consistency = consistency;
        this.scanMode = scanMode;
        this.timestampMills = timestampMills;
    }

    public void preCreateSource() {
    }

    public KafkaSource<RowData> createSource(@Nullable Map<Integer, Long> bucketOffsets) {
        switch (this.consistency) {
            case TRANSACTIONAL: {
                this.properties.setProperty("isolation.level", "read_committed");
                break;
            }
            case EVENTUAL: {
                this.properties.setProperty("isolation.level", "read_uncommitted");
            }
        }
        return KafkaSource.builder().setTopics(new String[]{this.topic}).setStartingOffsets(this.toOffsetsInitializer(bucketOffsets)).setDeserializer(this.createDeserializationSchema()).setGroupId(UUID.randomUUID().toString()).setProperties(this.properties).build();
    }

    @VisibleForTesting
    KafkaRecordDeserializationSchema<RowData> createDeserializationSchema() {
        return KafkaRecordDeserializationSchema.of((KafkaDeserializationSchema)new KafkaLogDeserializationSchema(this.physicalType, this.primaryKey, this.primaryKeyDeserializer, this.valueDeserializer, this.projectFields));
    }

    private OffsetsInitializer toOffsetsInitializer(@Nullable Map<Integer, Long> bucketOffsets) {
        switch (this.scanMode) {
            case LATEST_FULL: {
                return bucketOffsets == null ? OffsetsInitializer.earliest() : OffsetsInitializer.offsets(this.toKafkaOffsets(bucketOffsets));
            }
            case LATEST: {
                return OffsetsInitializer.latest();
            }
            case FROM_TIMESTAMP: {
                if (this.timestampMills == null) {
                    throw new NullPointerException("Must specify a timestamp if you choose timestamp startup mode.");
                }
                return OffsetsInitializer.timestamp((long)this.timestampMills);
            }
        }
        throw new UnsupportedOperationException("Unsupported mode: " + this.scanMode);
    }

    private Map<TopicPartition, Long> toKafkaOffsets(Map<Integer, Long> bucketOffsets) {
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        bucketOffsets.forEach((bucket, offset) -> offsets.put(new TopicPartition(this.topic, bucket.intValue()), (Long)offset));
        return offsets;
    }
}

