/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.util.Objects;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.paimon.flink.ProjectedRowData;
import org.apache.paimon.flink.Projection;

public class KafkaLogDeserializationSchema
implements KafkaDeserializationSchema<RowData> {
    private static final long serialVersionUID = 1L;
    private final TypeInformation<RowData> producedType;
    private final int fieldCount;
    private final int[] primaryKey;
    @Nullable
    private final DeserializationSchema<RowData> primaryKeyDeserializer;
    private final DeserializationSchema<RowData> valueDeserializer;
    private final RowData.FieldGetter[] keyFieldGetters;
    @Nullable
    private final int[][] projectFields;
    private transient ProjectCollector projectCollector;

    public KafkaLogDeserializationSchema(DataType physicalType, int[] primaryKey, @Nullable DeserializationSchema<RowData> primaryKeyDeserializer, DeserializationSchema<RowData> valueDeserializer, @Nullable int[][] projectFields) {
        this.primaryKey = primaryKey;
        this.primaryKeyDeserializer = primaryKeyDeserializer;
        this.valueDeserializer = valueDeserializer;
        RowType logicalType = (RowType)physicalType.getLogicalType();
        this.producedType = InternalTypeInfo.of((RowType)(projectFields == null ? logicalType : Projection.of(projectFields).project(logicalType)));
        this.fieldCount = physicalType.getChildren().size();
        this.projectFields = projectFields;
        this.keyFieldGetters = (RowData.FieldGetter[])IntStream.range(0, primaryKey.length).mapToObj(i -> KafkaLogDeserializationSchema.createNullCheckingFieldGetter(((DataType)physicalType.getChildren().get(primaryKey[i])).getLogicalType(), i)).toArray(RowData.FieldGetter[]::new);
    }

    private static RowData.FieldGetter createNullCheckingFieldGetter(LogicalType dataType, int index) {
        RowData.FieldGetter getter = RowData.createFieldGetter(dataType, index);
        if (dataType.isNullable()) {
            return getter;
        }
        return row -> {
            if (row.isNullAt(index)) {
                return null;
            }
            return getter.getFieldOrNull(row);
        };
    }

    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        if (this.primaryKeyDeserializer != null) {
            this.primaryKeyDeserializer.open(context);
        }
        this.valueDeserializer.open(context);
        this.projectCollector = new ProjectCollector();
    }

    public boolean isEndOfStream(RowData nextElement) {
        return false;
    }

    public RowData deserialize(ConsumerRecord<byte[], byte[]> record) {
        throw new RuntimeException("Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
    }

    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> underCollector) throws Exception {
        Collector collector = this.projectCollector.project((Collector<RowData>)underCollector);
        if (this.primaryKey.length > 0 && record.value() == null) {
            RowData key = (RowData)Objects.requireNonNull(this.primaryKeyDeserializer).deserialize((byte[])record.key());
            GenericRowData value = new GenericRowData(RowKind.DELETE, this.fieldCount);
            for (int i = 0; i < this.primaryKey.length; ++i) {
                value.setField(this.primaryKey[i], this.keyFieldGetters[i].getFieldOrNull(key));
            }
            collector.collect((Object)value);
        } else {
            this.valueDeserializer.deserialize((byte[])record.value(), collector);
        }
    }

    public TypeInformation<RowData> getProducedType() {
        return this.producedType;
    }

    private class ProjectCollector
    implements Collector<RowData> {
        private final ProjectedRowData projectedRow;
        private Collector<RowData> underCollector;

        private ProjectCollector() {
            this.projectedRow = KafkaLogDeserializationSchema.this.projectFields == null ? null : ProjectedRowData.from(KafkaLogDeserializationSchema.this.projectFields);
        }

        private Collector<RowData> project(Collector<RowData> underCollector) {
            if (this.projectedRow == null) {
                return underCollector;
            }
            this.underCollector = underCollector;
            return this;
        }

        public void collect(RowData rowData) {
            this.underCollector.collect((Object)this.projectedRow.replaceRow(rowData));
        }

        public void close() {
        }
    }
}

