/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.header.Header;
import org.apache.flink.shaded.guava31.com.google.common.primitives.Ints;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaRecordHeaderFilter;
import org.apache.flink.streaming.connectors.kafka.table.KafkaTableSpec;
import org.apache.flink.streaming.connectors.kafka.table.KafkaTableSpecCreator;
import org.apache.flink.streaming.connectors.kafka.table.MergedEvolvingKafkaDeserializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.EvolvingDataStreamScanProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsSchemaEvolutionReading;
import org.apache.flink.table.connector.source.abilities.SupportsTableSourceMerge;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.SourceRecord;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;

@Internal
public class KafkaDynamicSource
implements ScanTableSource,
SupportsReadingMetadata,
SupportsWatermarkPushDown,
SupportsProjectionPushDown,
SupportsSchemaEvolutionReading,
SupportsTableSourceMerge {
    private static final String KAFKA_TRANSFORMATION = "kafka";
    protected DataType producedDataType;
    protected List<String> metadataKeys;
    @Nullable
    protected WatermarkStrategy<RowData> watermarkStrategy;
    protected boolean enableSchemaEvolution;
    private static final String VALUE_METADATA_PREFIX = "value.";
    private final ObjectPath tablePath;
    protected final DataType physicalDataType;
    @Nullable
    protected final DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
    protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
    protected final int[] keyProjection;
    protected final int[] valueProjection;
    @Nullable
    protected int[][] physicalProjectedFields;
    @Nullable
    protected final String keyPrefix;
    @Nullable
    protected final String valuePrefix;
    protected final List<String> topics;
    protected final Pattern topicPattern;
    protected final Properties properties;
    protected final StartupMode startupMode;
    protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
    protected final long startupTimestampMillis;
    protected final BoundedMode boundedMode;
    protected final Map<KafkaTopicPartition, Long> specificBoundedOffsets;
    protected final long boundedTimestampMillis;
    protected final boolean upsertMode;
    protected final String tableIdentifier;
    protected final Map<ObjectPath, KafkaTableSpecCreator> capturedTables;
    @Nullable
    protected final String headerFilterExpression;

    public KafkaDynamicSource(ObjectPath tablePath, DataType physicalDataType, @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, @Nullable String valuePrefix, @Nullable List<String> topics, @Nullable Pattern topicPattern, Properties properties, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis, BoundedMode boundedMode, Map<KafkaTopicPartition, Long> specificBoundedOffsets, long boundedTimestampMillis, boolean upsertMode, String tableIdentifier, @Nullable String headerFilterExpression) {
        this.tablePath = tablePath;
        this.physicalDataType = (DataType)Preconditions.checkNotNull((Object)physicalDataType, (String)"Physical data type must not be null.");
        this.keyDecodingFormat = keyDecodingFormat;
        this.valueDecodingFormat = (DecodingFormat)Preconditions.checkNotNull(valueDecodingFormat, (String)"Value decoding format must not be null.");
        this.keyProjection = (int[])Preconditions.checkNotNull((Object)keyProjection, (String)"Key projection must not be null.");
        this.valueProjection = (int[])Preconditions.checkNotNull((Object)valueProjection, (String)"Value projection must not be null.");
        this.keyPrefix = keyPrefix;
        this.valuePrefix = valuePrefix;
        this.producedDataType = physicalDataType;
        this.metadataKeys = Collections.emptyList();
        this.watermarkStrategy = null;
        Preconditions.checkArgument((topics != null && topicPattern == null || topics == null && topicPattern != null ? 1 : 0) != 0, (Object)"Either Topic or Topic Pattern must be set for source.");
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.properties = (Properties)Preconditions.checkNotNull((Object)properties, (String)"Properties must not be null.");
        this.startupMode = (StartupMode)((Object)Preconditions.checkNotNull((Object)((Object)startupMode), (String)"Startup mode must not be null."));
        this.specificStartupOffsets = (Map)Preconditions.checkNotNull(specificStartupOffsets, (String)"Specific offsets must not be null.");
        this.startupTimestampMillis = startupTimestampMillis;
        this.boundedMode = (BoundedMode)((Object)Preconditions.checkNotNull((Object)((Object)boundedMode), (String)"Bounded mode must not be null."));
        this.specificBoundedOffsets = (Map)Preconditions.checkNotNull(specificBoundedOffsets, (String)"Specific bounded offsets must not be null.");
        this.boundedTimestampMillis = boundedTimestampMillis;
        this.upsertMode = upsertMode;
        this.tableIdentifier = tableIdentifier;
        this.enableSchemaEvolution = false;
        this.capturedTables = new HashMap<ObjectPath, KafkaTableSpecCreator>();
        this.capturedTables.put(tablePath, new KafkaTableSpecCreator(tablePath, topicPattern, topics, Collections.emptyList(), physicalDataType, this.producedDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, valuePrefix));
        this.headerFilterExpression = headerFilterExpression;
    }

    public ChangelogMode getChangelogMode() {
        return this.valueDecodingFormat.getChangelogMode();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext context) {
        int[] projectedKeyProjection = this.keyProjection;
        int[] projectedValueProjection = this.valueProjection;
        if (this.physicalProjectedFields != null) {
            ArrayList<int[]> keyProjectedFields = new ArrayList<int[]>();
            ArrayList<int[]> valueProjectedFields = new ArrayList<int[]>();
            ArrayList<Integer> newKeyProjection = new ArrayList<Integer>();
            ArrayList<Integer> newValueProjection = new ArrayList<Integer>();
            for (int i = 0; i < this.physicalProjectedFields.length; ++i) {
                int[] field = this.physicalProjectedFields[i];
                int[] copiedField = Arrays.copyOf(field, field.length);
                int index = copiedField[0];
                int keySearch = Ints.indexOf((int[])this.keyProjection, (int)index);
                int valueSearch = Ints.indexOf((int[])this.valueProjection, (int)index);
                if (keySearch != -1) {
                    copiedField[0] = keySearch;
                    keyProjectedFields.add(copiedField);
                    newKeyProjection.add(i);
                    continue;
                }
                if (valueSearch != -1) {
                    copiedField[0] = valueSearch;
                    valueProjectedFields.add(copiedField);
                    newValueProjection.add(i);
                    continue;
                }
                throw new TableException("Unknown projection: " + index);
            }
            if (this.keyDecodingFormat != null) {
                if (this.keyDecodingFormat instanceof SupportsProjectionPushDown) {
                    ((SupportsProjectionPushDown)this.keyDecodingFormat).applyProjection((int[][])keyProjectedFields.toArray((T[])new int[0][]));
                    projectedKeyProjection = newKeyProjection.stream().mapToInt(Integer::intValue).toArray();
                } else {
                    projectedKeyProjection = KafkaDynamicSource.mergeProjection(this.keyProjection.length, keyProjectedFields, newKeyProjection);
                }
            }
            if (this.valueDecodingFormat instanceof SupportsProjectionPushDown) {
                ((SupportsProjectionPushDown)this.valueDecodingFormat).applyProjection((int[][])valueProjectedFields.toArray((T[])new int[0][]));
                projectedValueProjection = newValueProjection.stream().mapToInt(Integer::intValue).toArray();
            } else {
                projectedValueProjection = KafkaDynamicSource.mergeProjection(this.valueProjection.length, valueProjectedFields, newValueProjection);
            }
        }
        if (this.enableSchemaEvolution) {
            TypeInformation producedTypeInfo = context.getEvolvingSourceTypeInfo();
            KafkaDeserializationSchema<SourceRecord> kafkaDeserializationSchema = this.buildKafkaEvolvingDeserializationSchema(context, (TypeInformation<SourceRecord>)producedTypeInfo);
            final KafkaSource<SourceRecord> kafkaSource = this.createKafkaSource(kafkaDeserializationSchema);
            return new EvolvingDataStreamScanProvider(){

                public DataStream<SourceRecord> produceDataStream(StreamExecutionEnvironment execEnv) {
                    Preconditions.checkArgument((KafkaDynamicSource.this.watermarkStrategy == null ? 1 : 0) != 0, (Object)"Can't define watermark strategy for the evolving source.");
                    return execEnv.fromSource((Source)kafkaSource, WatermarkStrategy.noWatermarks(), "EvovlingKafkaSource-" + KafkaDynamicSource.this.tableIdentifier);
                }

                public boolean isBounded() {
                    return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
                }
            };
        }
        DeserializationSchema<RowData> keyDeserialization = this.createDeserialization((DynamicTableSource.Context)context, this.physicalDataType, this.keyDecodingFormat, this.keyProjection, this.keyPrefix);
        DeserializationSchema<RowData> valueDeserialization = this.createDeserialization((DynamicTableSource.Context)context, this.physicalDataType, this.valueDecodingFormat, this.valueProjection, this.valuePrefix);
        TypeInformation producedTypeInfo = context.createTypeInformation(this.producedDataType);
        KafkaDeserializationSchema<RowData> kafkaDeserializer = this.buildKafkaDeserializationSchema(keyDeserialization, valueDeserialization, (TypeInformation<RowData>)producedTypeInfo, projectedKeyProjection, projectedValueProjection);
        final KafkaSource<RowData> kafkaSource = this.createKafkaSource(kafkaDeserializer);
        return new DataStreamScanProvider(){

            public DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
                if (KafkaDynamicSource.this.watermarkStrategy == null) {
                    KafkaDynamicSource.this.watermarkStrategy = WatermarkStrategy.noWatermarks();
                }
                DataStreamSource sourceStream = execEnv.fromSource((Source)kafkaSource, KafkaDynamicSource.this.watermarkStrategy, "KafkaSource-" + KafkaDynamicSource.this.tableIdentifier, null, providerContext.getObserver());
                providerContext.generateUid(KafkaDynamicSource.KAFKA_TRANSFORMATION).ifPresent(arg_0 -> ((DataStreamSource)sourceStream).uid(arg_0));
                providerContext.registerObservableOperator(Integer.valueOf(sourceStream.getTransformation().getId()));
                return sourceStream;
            }

            public boolean isBounded() {
                return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
            }
        };
    }

    private static int[] mergeProjection(int fieldSize, List<int[]> projectedNestedFields, List<Integer> projection) {
        for (int[] field : projectedNestedFields) {
            if (field.length <= 1) continue;
            throw new TableException("Merge projection does not support nested field.");
        }
        int[] projectedFields = projectedNestedFields.stream().mapToInt(f -> f[0]).toArray();
        int[] mergedProjection = new int[fieldSize];
        for (int i = 0; i < fieldSize; ++i) {
            int search = Ints.indexOf((int[])projectedFields, (int)i);
            mergedProjection[i] = search == -1 ? -1 : projection.get(search);
        }
        return mergedProjection;
    }

    public boolean supportsNestedProjection() {
        return (this.keyDecodingFormat == null || this.keyDecodingFormat instanceof SupportsProjectionPushDown && ((SupportsProjectionPushDown)this.keyDecodingFormat).supportsNestedProjection()) && this.valueDecodingFormat instanceof SupportsProjectionPushDown && ((SupportsProjectionPushDown)this.valueDecodingFormat).supportsNestedProjection();
    }

    public void applyProjection(int[][] projectedFields) {
        this.physicalProjectedFields = projectedFields;
    }

    public Map<String, DataType> listReadableMetadata() {
        LinkedHashMap<String, DataType> metadataMap = new LinkedHashMap<String, DataType>();
        this.valueDecodingFormat.listReadableMetadata().forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, (DataType)value));
        Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
        return metadataMap;
    }

    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
        List formatMetadataKeys = metadataKeys.stream().filter(k -> k.startsWith(VALUE_METADATA_PREFIX)).collect(Collectors.toList());
        ArrayList<String> connectorMetadataKeys = new ArrayList<String>(metadataKeys);
        connectorMetadataKeys.removeAll(formatMetadataKeys);
        Map formatMetadata = this.valueDecodingFormat.listReadableMetadata();
        if (formatMetadata.size() > 0) {
            List requestedFormatMetadataKeys = formatMetadataKeys.stream().map(k -> k.substring(VALUE_METADATA_PREFIX.length())).collect(Collectors.toList());
            this.valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
        }
        this.metadataKeys = connectorMetadataKeys;
        this.producedDataType = producedDataType;
        this.applyMetadataToBaseTable(connectorMetadataKeys);
    }

    public boolean supportsMetadataProjection() {
        return false;
    }

    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }

    public DynamicTableSource copy() {
        KafkaDynamicSource copy = new KafkaDynamicSource(this.tablePath, this.physicalDataType, this.keyDecodingFormat, this.valueDecodingFormat, this.keyProjection, this.valueProjection, this.keyPrefix, this.valuePrefix, this.topics, this.topicPattern, this.properties, this.startupMode, this.specificStartupOffsets, this.startupTimestampMillis, this.boundedMode, this.specificBoundedOffsets, this.boundedTimestampMillis, this.upsertMode, this.tableIdentifier, this.headerFilterExpression);
        copy.producedDataType = this.producedDataType;
        copy.metadataKeys = this.metadataKeys;
        copy.applyMetadataToBaseTable(this.metadataKeys);
        copy.capturedTables.putAll(this.capturedTables);
        copy.watermarkStrategy = this.watermarkStrategy;
        copy.enableSchemaEvolution = this.enableSchemaEvolution;
        copy.physicalProjectedFields = this.physicalProjectedFields;
        return copy;
    }

    public String asSummaryString() {
        return "Kafka table source";
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        KafkaDynamicSource that = (KafkaDynamicSource)o;
        return Objects.equals(this.tablePath, that.tablePath) && Objects.equals(this.producedDataType, that.producedDataType) && Objects.equals(this.metadataKeys, that.metadataKeys) && Objects.equals(this.physicalDataType, that.physicalDataType) && Objects.equals(this.keyDecodingFormat, that.keyDecodingFormat) && Objects.equals(this.valueDecodingFormat, that.valueDecodingFormat) && Arrays.equals(this.keyProjection, that.keyProjection) && Arrays.equals(this.valueProjection, that.valueProjection) && Objects.equals(this.keyPrefix, that.keyPrefix) && Objects.equals(this.valuePrefix, that.valuePrefix) && Objects.equals(this.topics, that.topics) && Objects.equals(String.valueOf(this.topicPattern), String.valueOf(that.topicPattern)) && Objects.equals(this.properties, that.properties) && this.startupMode == that.startupMode && Objects.equals(this.specificStartupOffsets, that.specificStartupOffsets) && this.startupTimestampMillis == that.startupTimestampMillis && this.boundedMode == that.boundedMode && Objects.equals(this.specificBoundedOffsets, that.specificBoundedOffsets) && this.boundedTimestampMillis == that.boundedTimestampMillis && Objects.equals(this.upsertMode, that.upsertMode) && Objects.equals(this.tableIdentifier, that.tableIdentifier) && Objects.equals(this.watermarkStrategy, that.watermarkStrategy) && Objects.equals(this.enableSchemaEvolution, that.enableSchemaEvolution) && Objects.equals(this.capturedTables, that.capturedTables) && Objects.equals(this.headerFilterExpression, that.headerFilterExpression);
    }

    public int hashCode() {
        return Objects.hash(new Object[]{this.tablePath, this.producedDataType, this.metadataKeys, this.physicalDataType, this.keyDecodingFormat, this.valueDecodingFormat, Arrays.hashCode(this.keyProjection), Arrays.hashCode(this.valueProjection), this.keyPrefix, this.valuePrefix, this.topics, String.valueOf(this.topicPattern), this.properties, this.startupMode, this.specificStartupOffsets, this.startupTimestampMillis, this.boundedMode, this.specificBoundedOffsets, this.boundedTimestampMillis, this.upsertMode, this.tableIdentifier, this.watermarkStrategy, this.enableSchemaEvolution, this.capturedTables, this.headerFilterExpression});
    }

    protected <T> KafkaSource<T> createKafkaSource(KafkaDeserializationSchema<T> kafkaDeserializer) {
        KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder();
        if (this.topics != null) {
            kafkaSourceBuilder.setTopics(this.getTopics());
        } else {
            kafkaSourceBuilder.setTopicPattern(this.topicPattern);
        }
        switch (this.startupMode) {
            case EARLIEST: {
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
                break;
            }
            case LATEST: {
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
                break;
            }
            case GROUP_OFFSETS: {
                String offsetResetConfig = this.properties.getProperty("auto.offset.reset", OffsetResetStrategy.NONE.name());
                OffsetResetStrategy offsetResetStrategy = this.getResetStrategy(offsetResetConfig);
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(offsetResetStrategy));
                break;
            }
            case SPECIFIC_OFFSETS: {
                HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
                this.specificStartupOffsets.forEach((tp, offset) -> offsets.put(new TopicPartition(tp.getTopic(), tp.getPartition()), (Long)offset));
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(offsets));
                break;
            }
            case TIMESTAMP: {
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(this.startupTimestampMillis));
            }
        }
        switch (this.boundedMode) {
            case UNBOUNDED: {
                kafkaSourceBuilder.setUnbounded(new NoStoppingOffsetsInitializer());
                break;
            }
            case LATEST: {
                kafkaSourceBuilder.setBounded(OffsetsInitializer.latest());
                break;
            }
            case GROUP_OFFSETS: {
                kafkaSourceBuilder.setBounded(OffsetsInitializer.committedOffsets());
                break;
            }
            case SPECIFIC_OFFSETS: {
                HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
                this.specificBoundedOffsets.forEach((tp, offset) -> offsets.put(new TopicPartition(tp.getTopic(), tp.getPartition()), (Long)offset));
                kafkaSourceBuilder.setBounded(OffsetsInitializer.offsets(offsets));
                break;
            }
            case TIMESTAMP: {
                kafkaSourceBuilder.setBounded(OffsetsInitializer.timestamp(this.boundedTimestampMillis));
            }
        }
        kafkaSourceBuilder.setProperties(this.properties).setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer));
        return kafkaSourceBuilder.build();
    }

    private List<String> getTopics() {
        return this.capturedTables.entrySet().stream().flatMap(e -> ((List)Preconditions.checkNotNull(((KafkaTableSpecCreator)e.getValue()).getTopics())).stream()).distinct().collect(Collectors.toList());
    }

    @Nullable
    private DeserializationSchema<RowData> createDeserialization(DynamicTableSource.Context context, DataType physicalDataType, @Nullable DecodingFormat<DeserializationSchema<RowData>> format, int[] projection, @Nullable String prefix) {
        if (format == null) {
            return null;
        }
        DataType physicalFormatDataType = Projection.of((int[])projection).project(physicalDataType);
        if (prefix != null) {
            physicalFormatDataType = DataTypeUtils.stripRowPrefix((DataType)physicalFormatDataType, (String)prefix);
        }
        return (DeserializationSchema)format.createRuntimeDecoder(context, physicalFormatDataType);
    }

    private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
        return Arrays.stream(OffsetResetStrategy.values()).filter(ors -> ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT))).findAny().orElseThrow(() -> new IllegalArgumentException(String.format("%s can not be set to %s. Valid values: [%s]", "auto.offset.reset", offsetResetConfig, Arrays.stream(OffsetResetStrategy.values()).map(Enum::name).map(String::toLowerCase).collect(Collectors.joining(",")))));
    }

    private KafkaDeserializationSchema<RowData> buildKafkaDeserializationSchema(DeserializationSchema<RowData> keyDeserialization, DeserializationSchema<RowData> valueDeserialization, TypeInformation<RowData> producedTypeInfo, int[] projectedKeyProjection, int[] projectedValueProjection) {
        DynamicKafkaDeserializationSchema.MetadataConverter[] metadataConverters = (DynamicKafkaDeserializationSchema.MetadataConverter[])this.metadataKeys.stream().map(k -> Stream.of(ReadableMetadata.values()).filter(rm -> rm.key.equals(k)).findFirst().orElseThrow(IllegalStateException::new)).map(m -> m.converter).toArray(DynamicKafkaDeserializationSchema.MetadataConverter[]::new);
        boolean hasMetadata = this.metadataKeys.size() > 0;
        int adjustedPhysicalArity = DataType.getFieldDataTypes((DataType)this.producedDataType).size() - this.metadataKeys.size();
        int[] adjustedValueProjection = IntStream.concat(IntStream.of(projectedValueProjection), IntStream.range(this.validFields(projectedKeyProjection) + this.validFields(projectedValueProjection), adjustedPhysicalArity)).toArray();
        return new DynamicKafkaDeserializationSchema(adjustedPhysicalArity, keyDeserialization, projectedKeyProjection, valueDeserialization, adjustedValueProjection, keyDeserialization == null && !hasMetadata && (this.physicalProjectedFields == null || this.valueDecodingFormat instanceof SupportsProjectionPushDown), metadataConverters, producedTypeInfo, this.upsertMode, this.headerFilterExpression == null ? null : new KafkaRecordHeaderFilter(this.headerFilterExpression));
    }

    private int validFields(int[] projection) {
        return (int)IntStream.of(projection).filter(i -> i != -1).count();
    }

    private KafkaDeserializationSchema<SourceRecord> buildKafkaEvolvingDeserializationSchema(ScanTableSource.ScanContext context, TypeInformation<SourceRecord> producedTypeInfo) {
        HashMap<ObjectPath, KafkaTableSpec> converters = new HashMap<ObjectPath, KafkaTableSpec>();
        for (KafkaTableSpecCreator creator : this.capturedTables.values()) {
            converters.put(creator.getTablePathInFlink(), creator.create(context));
        }
        return new MergedEvolvingKafkaDeserializationSchema(converters, producedTypeInfo);
    }

    public void applySchemaEvolution() {
        if (this.upsertMode) {
            throw new UnsupportedOperationException("Connector 'upsert-kafka' doesn't support schema evolution.");
        }
        this.enableSchemaEvolution = true;
    }

    public boolean applyTableSource(ScanTableSource anotherTableSource) {
        KafkaDynamicSource anotherKafkaTableSource;
        if (!this.enableSchemaEvolution) {
            return false;
        }
        if (anotherTableSource instanceof KafkaDynamicSource && this.canMergeAnotherSource(anotherKafkaTableSource = (KafkaDynamicSource)anotherTableSource)) {
            this.capturedTables.putAll(anotherKafkaTableSource.capturedTables);
            return true;
        }
        return false;
    }

    @VisibleForTesting
    void applyMetadataToBaseTable(List<String> metadataKeys) {
        KafkaTableSpecCreator spec = this.capturedTables.get(this.tablePath);
        this.capturedTables.put(this.tablePath, spec.copy(metadataKeys, this.producedDataType));
    }

    private boolean canMergeAnotherSource(KafkaDynamicSource anotherKafkaTableSource) {
        if (!anotherKafkaTableSource.properties.equals(this.properties)) {
            return false;
        }
        if (this.topicPattern != null || anotherKafkaTableSource.topicPattern != null) {
            return false;
        }
        if (!StartupMode.GROUP_OFFSETS.equals((Object)this.startupMode) && !StartupMode.LATEST.equals((Object)this.startupMode) && !StartupMode.EARLIEST.equals((Object)this.startupMode) || !this.startupMode.equals((Object)anotherKafkaTableSource.startupMode)) {
            return false;
        }
        return anotherKafkaTableSource.enableSchemaEvolution;
    }

    static enum ReadableMetadata {
        TOPIC("topic", (DataType)DataTypes.STRING().notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object read(ConsumerRecord<?, ?> record) {
                return StringData.fromString((String)record.topic());
            }
        }),
        PARTITION("partition", (DataType)DataTypes.INT().notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object read(ConsumerRecord<?, ?> record) {
                return record.partition();
            }
        }),
        HEADERS("headers", (DataType)DataTypes.MAP((DataType)((DataType)DataTypes.STRING().nullable()), (DataType)((DataType)DataTypes.BYTES().nullable())).notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object read(ConsumerRecord<?, ?> record) {
                HashMap<StringData, byte[]> map = new HashMap<StringData, byte[]>();
                for (Header header : record.headers()) {
                    map.put(StringData.fromString((String)header.key()), header.value());
                }
                return new GenericMapData(map);
            }
        }),
        LEADER_EPOCH("leader-epoch", (DataType)DataTypes.INT().nullable(), new DynamicKafkaDeserializationSchema.MetadataConverter(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object read(ConsumerRecord<?, ?> record) {
                return record.leaderEpoch().orElse(null);
            }
        }),
        OFFSET("offset", (DataType)DataTypes.BIGINT().notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object read(ConsumerRecord<?, ?> record) {
                return record.offset();
            }
        }),
        TIMESTAMP("timestamp", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3).notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object read(ConsumerRecord<?, ?> record) {
                return TimestampData.fromEpochMillis((long)record.timestamp());
            }
        }),
        TIMESTAMP_TYPE("timestamp-type", (DataType)DataTypes.STRING().notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object read(ConsumerRecord<?, ?> record) {
                return StringData.fromString((String)record.timestampType().toString());
            }
        });

        final String key;
        final DataType dataType;
        final DynamicKafkaDeserializationSchema.MetadataConverter converter;

        private ReadableMetadata(String key, DataType dataType, DynamicKafkaDeserializationSchema.MetadataConverter converter) {
            this.key = key;
            this.dataType = dataType;
            this.converter = converter;
        }
    }
}

