/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.format.compatible.kafka.connect.json;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJsonFormatOptions;
import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

public class CompatibleKafkaConnectDeserializationSchema
implements DeserializationSchema<SeaTunnelRow> {
    private static final String INCLUDE_SCHEMA_METHOD = "convertToJsonWithEnvelope";
    private static final String EXCLUDE_SCHEMA_METHOD = "convertToJsonWithoutEnvelope";
    private static final String KAFKA_CONNECT_SINK_RECORD_PAYLOAD = "payload";
    private transient JsonConverter keyConverter;
    private transient JsonConverter valueConverter;
    private transient Method keyConverterMethod;
    private transient Method valueConverterMethod;
    private final SeaTunnelRowType seaTunnelRowType;
    private final JsonToRowConverters.JsonToRowConverter runtimeConverter;
    private final boolean keySchemaEnable;
    private final boolean valueSchemaEnable;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public CompatibleKafkaConnectDeserializationSchema(@NonNull SeaTunnelRowType seaTunnelRowType, @NonNull Config config, boolean failOnMissingField, boolean ignoreParseErrors) {
        if (seaTunnelRowType == null) {
            throw new NullPointerException("seaTunnelRowType is marked non-null but is null");
        }
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        Map configMap = ReadonlyConfig.fromConfig((Config)config).toMap();
        this.seaTunnelRowType = seaTunnelRowType;
        this.keySchemaEnable = KafkaConnectJsonFormatOptions.getKeyConverterSchemaEnabled(configMap);
        this.valueSchemaEnable = KafkaConnectJsonFormatOptions.getValueConverterSchemaEnabled(configMap);
        this.runtimeConverter = new JsonToRowConverters(failOnMissingField, ignoreParseErrors).createConverter((SeaTunnelDataType)Preconditions.checkNotNull((Object)seaTunnelRowType));
    }

    public SeaTunnelRow deserialize(byte[] message) throws IOException {
        throw new UnsupportedEncodingException();
    }

    public void deserialize(ConsumerRecord<byte[], byte[]> msg, Collector<SeaTunnelRow> out) throws InvocationTargetException, IllegalAccessException {
        this.tryInitConverter();
        SinkRecord record = this.convertToSinkRecord(msg);
        RowKind rowKind = RowKind.INSERT;
        com.fasterxml.jackson.databind.JsonNode jsonNode = (com.fasterxml.jackson.databind.JsonNode)this.valueConverterMethod.invoke((Object)this.valueConverter, record.valueSchema(), record.value());
        com.fasterxml.jackson.databind.JsonNode payload = jsonNode.get(KAFKA_CONNECT_SINK_RECORD_PAYLOAD);
        if (payload.isArray()) {
            ArrayNode arrayNode = (ArrayNode)payload;
            for (int i = 0; i < arrayNode.size(); ++i) {
                SeaTunnelRow row = this.convertJsonNode(arrayNode.get(i));
                row.setRowKind(rowKind);
                out.collect((Object)row);
            }
        } else {
            SeaTunnelRow row = this.convertJsonNode(payload);
            row.setRowKind(rowKind);
            out.collect((Object)row);
        }
    }

    private SeaTunnelRow convertJsonNode(com.fasterxml.jackson.databind.JsonNode jsonNode) {
        if (jsonNode.isNull()) {
            return null;
        }
        try {
            JsonNode jsonData = this.objectMapper.readTree(jsonNode.toString());
            return (SeaTunnelRow)this.runtimeConverter.convert(jsonData);
        }
        catch (Throwable t) {
            throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCode.JSON_OPERATION_FAILED, String.format("Failed to deserialize JSON '%s'.", jsonNode), t);
        }
    }

    private SinkRecord convertToSinkRecord(ConsumerRecord<byte[], byte[]> msg) {
        SchemaAndValue keyAndSchema = msg.key() == null ? SchemaAndValue.NULL : this.keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key());
        SchemaAndValue valueAndSchema = this.valueConverter.toConnectData(msg.topic(), msg.headers(), msg.value());
        return new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset(), msg.timestamp(), msg.timestampType(), null);
    }

    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.seaTunnelRowType;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryInitConverter() {
        CompatibleKafkaConnectDeserializationSchema compatibleKafkaConnectDeserializationSchema;
        if (this.keyConverter == null) {
            compatibleKafkaConnectDeserializationSchema = this;
            synchronized (compatibleKafkaConnectDeserializationSchema) {
                if (this.keyConverter == null) {
                    this.keyConverter = new JsonConverter();
                    this.keyConverter.configure(Collections.singletonMap("schemas.enable", this.keySchemaEnable), true);
                    this.keyConverterMethod = (Method)ReflectionUtils.getDeclaredMethod(JsonConverter.class, (String)(this.keySchemaEnable ? INCLUDE_SCHEMA_METHOD : EXCLUDE_SCHEMA_METHOD), (Class[])new Class[]{Schema.class, Object.class}).get();
                }
            }
        }
        if (this.valueConverter == null) {
            compatibleKafkaConnectDeserializationSchema = this;
            synchronized (compatibleKafkaConnectDeserializationSchema) {
                if (this.valueConverter == null) {
                    this.valueConverter = new JsonConverter();
                    this.valueConverter.configure(Collections.singletonMap("schemas.enable", this.valueSchemaEnable), false);
                    this.valueConverterMethod = (Method)ReflectionUtils.getDeclaredMethod(JsonConverter.class, (String)(this.valueSchemaEnable ? INCLUDE_SCHEMA_METHOD : EXCLUDE_SCHEMA_METHOD), (Class[])new Class[]{Schema.class, Object.class}).get();
                }
            }
        }
    }

    public CompatibleKafkaConnectDeserializationSchema(SeaTunnelRowType seaTunnelRowType, JsonToRowConverters.JsonToRowConverter runtimeConverter, boolean keySchemaEnable, boolean valueSchemaEnable) {
        this.seaTunnelRowType = seaTunnelRowType;
        this.runtimeConverter = runtimeConverter;
        this.keySchemaEnable = keySchemaEnable;
        this.valueSchemaEnable = valueSchemaEnable;
    }
}

