/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.ColumnSpec;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.SchemaSpec;
import org.apache.flink.table.data.SourceRecord;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;

public class DynamicEvolvingKafkaDeserializationSchema
implements KafkaDeserializationSchema<SourceRecord> {
    private static final long serialVersionUID = 1L;
    @Nullable
    private final DeserializationSchema<RowData> keyDeserializer;
    private final DeserializationSchema<SourceRecord> valueDeserializer;
    private final TypeInformation<SourceRecord> outputTypeInfo;
    private final RowType keyType;
    @Nullable
    private final String valuePrefix;
    private final boolean hasMetadata;
    private final ConcatCollector collector;
    private transient RowData emptyKey;

    public DynamicEvolvingKafkaDeserializationSchema(@Nullable DeserializationSchema<RowData> keyDeserializer, DeserializationSchema<SourceRecord> valueDeserializer, DynamicKafkaDeserializationSchema.MetadataConverter[] metadataConverters, ObjectPath tablePath, RowType keyType, RowType metadataType, @Nullable String valuePrefix, TypeInformation<SourceRecord> outputTypeInfo) {
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.outputTypeInfo = outputTypeInfo;
        this.keyType = keyType;
        this.valuePrefix = valuePrefix;
        this.hasMetadata = metadataConverters.length > 0;
        this.collector = new ConcatCollector(tablePath, keyType, metadataType, metadataConverters);
    }

    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        this.emptyKey = new GenericRowData(this.keyType.getFieldCount());
        if (this.keyDeserializer != null) {
            this.keyDeserializer.open(context);
        }
        this.valueDeserializer.open(context);
    }

    @Override
    public boolean isEndOfStream(SourceRecord nextElement) {
        return false;
    }

    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<SourceRecord> out) throws Exception {
        SourceRecord valueRecord = (SourceRecord)this.valueDeserializer.deserialize(message.value());
        if (this.keyDeserializer == null && !this.hasMetadata) {
            if (valueRecord != null && this.valuePrefix != null) {
                SchemaSpec.Builder builder = SchemaSpec.newBuilder();
                this.addPrefixForValueRecordColumn(builder, valueRecord.getSchema(), this.valuePrefix);
                out.collect((Object)new SourceRecord(valueRecord.getTablePath(), builder.build(), valueRecord.getRow()));
            } else if (valueRecord != null) {
                out.collect((Object)valueRecord);
            }
            return;
        }
        if (this.keyDeserializer != null) {
            this.collector.key = (RowData)this.keyDeserializer.deserialize(message.key());
        }
        this.collector.outputCollector = out;
        this.collector.originMessage = message;
        this.collector.collect(valueRecord);
    }

    @Override
    public SourceRecord deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        throw new IllegalStateException("A collector is required for deserializing.");
    }

    public TypeInformation<SourceRecord> getProducedType() {
        return this.outputTypeInfo;
    }

    private void addPrefixForValueRecordColumn(SchemaSpec.Builder builder, SchemaSpec valueSchema, String valuePrefix) {
        for (ColumnSpec column : valueSchema.getColumns()) {
            builder.column(valuePrefix + column.getName(), column.getDataType());
        }
    }

    private class ConcatCollector
    implements Collector<SourceRecord>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private final ObjectPath tablePath;
        private final RowType keyType;
        private final RowType metadataType;
        private final DynamicKafkaDeserializationSchema.MetadataConverter[] converters;
        private final SchemaSpec schemaWithoutValue;
        private transient ConsumerRecord<byte[], byte[]> originMessage;
        private transient RowData key;
        private transient Collector<SourceRecord> outputCollector;

        public ConcatCollector(ObjectPath tablePath, RowType keyType, RowType metadataType, DynamicKafkaDeserializationSchema.MetadataConverter[] converters) {
            this.tablePath = tablePath;
            this.keyType = keyType;
            this.metadataType = metadataType;
            this.converters = converters;
            this.schemaWithoutValue = SchemaSpec.newBuilder().fromRowType(keyType).fromRowType(metadataType).build();
        }

        public void collect(SourceRecord valueRecord) {
            if (!(this.key != null || valueRecord != null && valueRecord.getSchema().getColumnCount() != 0 || DynamicEvolvingKafkaDeserializationSchema.this.hasMetadata)) {
                return;
            }
            this.outputCollector.collect((Object)this.concatAll(this.key == null ? DynamicEvolvingKafkaDeserializationSchema.this.emptyKey : this.key, valueRecord));
        }

        public void close() {
        }

        private SourceRecord concatAll(RowData key, @Nullable SourceRecord value) {
            RowData metadata = this.extractMetadata();
            if (value != null) {
                SchemaSpec outputSchema = this.buildOutputSchema(value);
                JoinedRowData outputRow = new JoinedRowData((RowData)new JoinedRowData(key, value.getRow()), metadata);
                return new SourceRecord(this.tablePath, outputSchema, (RowData)outputRow);
            }
            return new SourceRecord(this.tablePath, this.schemaWithoutValue, (RowData)new JoinedRowData(key, metadata));
        }

        private RowData extractMetadata() {
            GenericRowData metadata = new GenericRowData(this.metadataType.getFieldCount());
            for (int i = 0; i < metadata.getArity(); ++i) {
                metadata.setField(i, this.converters[i].read(this.originMessage));
            }
            return metadata;
        }

        private SchemaSpec buildOutputSchema(SourceRecord value) {
            SchemaSpec.Builder builder = SchemaSpec.newBuilder();
            builder.fromRowType(this.keyType);
            if (DynamicEvolvingKafkaDeserializationSchema.this.valuePrefix == null) {
                builder.fromSchema(value.getSchema());
            } else {
                DynamicEvolvingKafkaDeserializationSchema.this.addPrefixForValueRecordColumn(builder, value.getSchema(), DynamicEvolvingKafkaDeserializationSchema.this.valuePrefix);
            }
            builder.fromRowType(this.metadataType);
            return builder.build();
        }
    }
}

