/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.DynamicEvolvingKafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaTableSpec;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.SourceRecord;
import org.apache.flink.util.Collector;

public class MergedEvolvingKafkaDeserializationSchema
implements KafkaDeserializationSchema<SourceRecord> {
    private static final long serialVersionUID = 1L;
    private final Map<ObjectPath, KafkaTableSpec> capturedTables;
    private final Map<String, List<DynamicEvolvingKafkaDeserializationSchema>> topicsCachedConverters;
    private final TypeInformation<SourceRecord> outputTypeInfo;
    private final List<KafkaTableSpec> patternTables;

    public MergedEvolvingKafkaDeserializationSchema(Map<ObjectPath, KafkaTableSpec> capturedTables, TypeInformation<SourceRecord> outputTypeInfo) {
        this.capturedTables = capturedTables;
        this.outputTypeInfo = outputTypeInfo;
        this.topicsCachedConverters = new HashMap<String, List<DynamicEvolvingKafkaDeserializationSchema>>();
        this.patternTables = new LinkedList<KafkaTableSpec>();
    }

    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        for (KafkaTableSpec spec : this.capturedTables.values()) {
            DynamicEvolvingKafkaDeserializationSchema deserializationSchema = spec.getDeserializationSchema();
            deserializationSchema.open(context);
            if (spec.getTopicPattern() == null) {
                List<String> topics = spec.getTopics();
                for (String topic : topics) {
                    this.topicsCachedConverters.computeIfAbsent(topic, k -> new ArrayList()).add(deserializationSchema);
                }
                continue;
            }
            this.patternTables.add(spec);
        }
    }

    @Override
    public boolean isEndOfStream(SourceRecord nextElement) {
        return false;
    }

    @Override
    public SourceRecord deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        throw new IllegalStateException("A collector is required for deserializing.");
    }

    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<SourceRecord> out) throws Exception {
        String topic = message.topic();
        boolean recordDeserialized = false;
        if (this.topicsCachedConverters.containsKey(topic)) {
            for (DynamicEvolvingKafkaDeserializationSchema deserializationSchema : this.topicsCachedConverters.get(topic)) {
                deserializationSchema.deserialize(message, out);
            }
            recordDeserialized = true;
        }
        for (KafkaTableSpec spec : this.patternTables) {
            if (!spec.getTopicPattern().matcher(topic).matches()) continue;
            spec.getDeserializationSchema().deserialize(message, out);
            recordDeserialized = true;
        }
        if (!recordDeserialized) {
            throw new TableException("Don't find the deserializer for the topic: %s. This should never happen, please report a bug issue.");
        }
    }

    public TypeInformation<SourceRecord> getProducedType() {
        return this.outputTypeInfo;
    }
}

