/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.SchemaAwareSinkRecord;
import org.apache.flink.table.data.SchemaSpec;
import org.apache.flink.table.data.SinkRecord;
import org.apache.flink.table.evolution.SchemaClient;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicEvolvingKafkaRecordSerializationSchema
implements KafkaRecordSerializationSchema<SinkRecord> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(DynamicEvolvingKafkaRecordSerializationSchema.class);
    private final String topic;
    private final KafkaPartitioner<RowData> partitioner;
    private final boolean hasMetadata;
    private final boolean upsertMode;
    private SchemaClient schemaClient;
    private final List<String> metadataKeys;
    @Nullable
    private final SerializationSchema<RowData> keySerializationSchema;
    private final SerializationSchema<SchemaAwareSinkRecord> valueSerializationSchema;
    private final KafkaConnectorOptions.ValueFieldsStrategy valueFieldsStrategy;
    private final List<String> keyFields;
    @Nullable
    private final String valuePrefix;
    @Nullable
    private final String keyPrefix;
    private final EvolvingSerializationSchemaCache evolvingSerializationSchemaCache;
    protected static final int SCHEMA_CACHE_SIZE = 50;
    private static final int METADATA_NOT_FOUND = -1;

    DynamicEvolvingKafkaRecordSerializationSchema(String topic, KafkaConnectorOptions.ValueFieldsStrategy valueFieldsStrategy, @Nullable String valuePrefix, @Nullable String keyPrefix, List<String> keyFields, @Nullable KafkaPartitioner<RowData> partitioner, @Nullable SerializationSchema<RowData> keySerializationSchema, SerializationSchema<SchemaAwareSinkRecord> valueSerializationSchema, boolean hasMetadata, List<String> metadataKeys, boolean upsertMode) {
        if (upsertMode) {
            Preconditions.checkArgument((keySerializationSchema != null && !keyFields.isEmpty() ? 1 : 0) != 0, (Object)"Key must be set in upsert mode for serialization schema.");
        }
        this.topic = (String)Preconditions.checkNotNull((Object)topic);
        this.partitioner = partitioner;
        this.hasMetadata = hasMetadata;
        this.upsertMode = upsertMode;
        this.metadataKeys = metadataKeys;
        this.keySerializationSchema = keySerializationSchema;
        this.valueSerializationSchema = valueSerializationSchema;
        this.evolvingSerializationSchemaCache = new EvolvingSerializationSchemaCache();
        this.valueFieldsStrategy = valueFieldsStrategy;
        this.valuePrefix = valuePrefix;
        this.keyPrefix = keyPrefix;
        this.keyFields = keyFields;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(SinkRecord consumedRecord, KafkaRecordSerializationSchema.KafkaSinkContext context, Long timestamp) {
        byte[] valueSerialized;
        byte[] keySerialized;
        int schemaId = consumedRecord.getSchemaId();
        this.initSerializationCacheEntry(consumedRecord);
        RowData consumedRow = consumedRecord.getRow();
        if (this.keySerializationSchema == null && !this.hasMetadata) {
            byte[] valueSerialized2 = this.valueSerializationSchema.serialize((Object)this.generateSchemaAwareSinkRecord(consumedRecord.getTablePath(), (RowType)this.evolvingSerializationSchemaCache.getValueDataType(schemaId).getLogicalType(), schemaId, consumedRow));
            return new ProducerRecord<Object, byte[]>(this.topic, this.extractPartition(consumedRow, null, valueSerialized2, context.getPartitionsForTopic(this.topic)), null, valueSerialized2);
        }
        if (this.keySerializationSchema == null) {
            keySerialized = null;
        } else {
            RowData keyRow = DynamicKafkaRecordSerializationSchema.createProjectedRow(consumedRow, RowKind.INSERT, this.evolvingSerializationSchemaCache.getKeyFieldGetters(schemaId));
            keySerialized = this.keySerializationSchema.serialize((Object)keyRow);
        }
        RowKind kind = consumedRow.getRowKind();
        if (this.upsertMode) {
            if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) {
                valueSerialized = null;
            } else {
                RowData valueRow = DynamicKafkaRecordSerializationSchema.createProjectedRow(consumedRow, kind, this.evolvingSerializationSchemaCache.getValueFieldGetters(schemaId));
                valueRow.setRowKind(RowKind.INSERT);
                valueSerialized = this.valueSerializationSchema.serialize((Object)this.generateSchemaAwareSinkRecord(consumedRecord.getTablePath(), (RowType)this.evolvingSerializationSchemaCache.getValueDataType(schemaId).getLogicalType(), schemaId, valueRow));
            }
        } else {
            RowData valueRow = DynamicKafkaRecordSerializationSchema.createProjectedRow(consumedRow, kind, this.evolvingSerializationSchemaCache.getValueFieldGetters(schemaId));
            valueSerialized = this.valueSerializationSchema.serialize((Object)this.generateSchemaAwareSinkRecord(consumedRecord.getTablePath(), (RowType)this.evolvingSerializationSchemaCache.getValueDataType(schemaId).getLogicalType(), schemaId, valueRow));
        }
        return new ProducerRecord<byte[], byte[]>(this.topic, this.extractPartition(consumedRow, keySerialized, valueSerialized, context.getPartitionsForTopic(this.topic)), (Long)this.readMetadata(consumedRecord, KafkaDynamicSink.WritableMetadata.TIMESTAMP), keySerialized, valueSerialized, (Iterable)this.readMetadata(consumedRecord, KafkaDynamicSink.WritableMetadata.HEADERS));
    }

    @Override
    public void open(SerializationSchema.InitializationContext context, KafkaRecordSerializationSchema.KafkaSinkContext sinkContext) throws Exception {
        if (this.keySerializationSchema != null) {
            this.keySerializationSchema.open(context);
        }
        this.valueSerializationSchema.open(context);
        if (this.partitioner != null) {
            this.partitioner.open(sinkContext.getParallelInstanceId(), sinkContext.getNumberOfParallelInstances());
        }
        this.initSchemaClient(sinkContext);
    }

    private void initSchemaClient(KafkaRecordSerializationSchema.KafkaSinkContext sinkContext) {
        if (this.schemaClient == null) {
            Preconditions.checkState((boolean)sinkContext.getRuntimeContext().isPresent(), (Object)"Cannot get runtime context from kafka sink context.");
            this.schemaClient = SchemaClient.of((RuntimeContext)sinkContext.getRuntimeContext().get());
        }
    }

    private void initSerializationCacheEntry(SinkRecord sinkRecord) {
        int schemaId = sinkRecord.getSchemaId();
        if (!this.evolvingSerializationSchemaCache.contains(schemaId)) {
            try {
                SchemaSpec schemaSpec = this.schemaClient.getSchemaSpec(sinkRecord.getTablePath(), schemaId);
                DataType physicalDataType = schemaSpec.toRowDataType();
                int[] keyProjection = KafkaConnectorOptionsUtil.createKeyFormatProjection(this.keySerializationSchema != null, this.keyFields, this.keyPrefix, physicalDataType, true);
                int[] valueProjection = KafkaConnectorOptionsUtil.createValueFormatProjection(this.valueFieldsStrategy, this.valuePrefix, this.keySerializationSchema != null, this.keyFields, this.keyPrefix, physicalDataType, true);
                this.evolvingSerializationSchemaCache.addCache(schemaId, Projection.of((int[])valueProjection).project(physicalDataType), DynamicEvolvingKafkaRecordSerializationSchema.getFieldGettersFromDataType(physicalDataType, valueProjection), this.keySerializationSchema != null ? DynamicEvolvingKafkaRecordSerializationSchema.getFieldGettersFromDataType(physicalDataType, keyProjection) : null, this.hasMetadata ? this.getMetadataPositions(physicalDataType) : null);
            }
            catch (Exception e) {
                String errorMsg = String.format("Fail to create the serializer for the schema %d.", schemaId);
                LOG.error(errorMsg, (Throwable)e);
                throw new IllegalStateException(errorMsg, e);
            }
        }
    }

    private Integer extractPartition(RowData consumedRow, @Nullable byte[] keySerialized, byte[] valueSerialized, int[] partitions) {
        if (this.partitioner != null) {
            return this.partitioner.partition(consumedRow, keySerialized, valueSerialized, this.topic, partitions);
        }
        return null;
    }

    @VisibleForTesting
    protected EvolvingSerializationSchemaCache getEvolvingSerializationSchemaCache() {
        return this.evolvingSerializationSchemaCache;
    }

    private <T> T readMetadata(SinkRecord sinkRecord, KafkaDynamicSink.WritableMetadata metadata) {
        if (!this.hasMetadata) {
            return null;
        }
        int pos = this.evolvingSerializationSchemaCache.getMetadataPosition(sinkRecord.getSchemaId())[metadata.ordinal()];
        if (pos < 0) {
            return null;
        }
        return (T)metadata.converter.read(sinkRecord.getRow(), pos);
    }

    static RowData.FieldGetter[] getFieldGettersFromDataType(DataType physicalDataType, int[] projection) {
        List physicalChildren = physicalDataType.getLogicalType().getChildren();
        return KafkaDynamicSink.getFieldGetters(physicalChildren, projection);
    }

    private int[] getMetadataPositions(DataType physicalDataType) {
        List physicalChildren = physicalDataType.getLogicalType().getChildren();
        return Stream.of(KafkaDynamicSink.WritableMetadata.values()).mapToInt(m -> {
            int pos = this.metadataKeys.indexOf(m.key);
            if (pos < 0) {
                return -1;
            }
            return physicalChildren.size() + pos;
        }).toArray();
    }

    private SchemaAwareSinkRecord generateSchemaAwareSinkRecord(ObjectPath tablePath, RowType rowType, int schemaId, RowData row) {
        return new SchemaAwareSinkRecord(tablePath, SchemaSpec.newBuilder().fromRowType(rowType).build(), schemaId, row);
    }

    private static final class LRUMap<K, V>
    extends LinkedHashMap<K, V> {
        private static final long serialVersionUID = 1L;
        protected final int cacheSize;
        private final RemovalListener<K, V> removalListener;

        public LRUMap(int cacheSize) {
            this(cacheSize, null);
        }

        public LRUMap(int cacheSize, RemovalListener<K, V> removalListener) {
            super((int)Math.ceil((double)cacheSize / 0.75) + 1, 0.75f, true);
            this.cacheSize = cacheSize;
            this.removalListener = removalListener;
        }

        @Override
        protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
            if (this.size() > this.cacheSize) {
                if (this.removalListener != null) {
                    this.removalListener.onRemoval(eldest);
                }
                return true;
            }
            return false;
        }

        public static interface RemovalListener<K, V>
        extends Serializable {
            public void onRemoval(Map.Entry<K, V> var1);
        }
    }

    protected static class EvolvingSerializationSchemaCache
    implements Serializable {
        private static final long serialVersionUID = 3737757423249510964L;
        private final LRUMap<Integer, DataType> valueDataTypeMap;
        private final HashMap<Integer, RowData.FieldGetter[]> keyFieldGetterMap = new HashMap(50);
        private final HashMap<Integer, RowData.FieldGetter[]> valueFieldGetterMap = new HashMap(50);
        private final HashMap<Integer, int[]> metadataPositionMap = new HashMap(50);

        EvolvingSerializationSchemaCache() {
            this.valueDataTypeMap = new LRUMap(50, eldest -> {
                this.keyFieldGetterMap.remove(eldest.getKey());
                this.valueFieldGetterMap.remove(eldest.getKey());
                this.metadataPositionMap.remove(eldest.getKey());
            });
        }

        public boolean contains(int schemaId) {
            return this.valueDataTypeMap.containsKey(schemaId);
        }

        public void addCache(int schemaId, DataType valueDataType, RowData.FieldGetter[] valueFieldGetters, @Nullable RowData.FieldGetter[] keyFieldGetters, @Nullable int[] metadataPos) {
            this.valueDataTypeMap.put(schemaId, valueDataType);
            this.valueFieldGetterMap.put(schemaId, valueFieldGetters);
            if (keyFieldGetters != null) {
                this.keyFieldGetterMap.put(schemaId, keyFieldGetters);
            }
            if (metadataPos != null) {
                this.metadataPositionMap.put(schemaId, metadataPos);
            }
        }

        public DataType getValueDataType(int schemaId) {
            return (DataType)this.valueDataTypeMap.get(schemaId);
        }

        public RowData.FieldGetter[] getValueFieldGetters(int schemaId) {
            return this.valueFieldGetterMap.get(schemaId);
        }

        @Nullable
        public RowData.FieldGetter[] getKeyFieldGetters(int schemaId) {
            return this.keyFieldGetterMap.get(schemaId);
        }

        @Nullable
        public int[] getMetadataPosition(int schemaId) {
            return this.metadataPositionMap.get(schemaId);
        }
    }
}

