/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.format.compatible.kafka.connect.json;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Optional;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;

public class CompatibleKafkaConnectDeserializationSchema
implements DeserializationSchema<SeaTunnelRow> {
    private static final String INCLUDE_SCHEMA_METHOD = "convertToJsonWithEnvelope";
    private static final String EXCLUDE_SCHEMA_METHOD = "convertToJsonWithoutEnvelope";
    private static final String KAFKA_CONNECT_SINK_RECORD_PAYLOAD = "payload";
    public static final String FORMAT = "Kafka.Connect";
    private transient JsonConverter keyConverter;
    private transient JsonConverter valueConverter;
    private transient Method keyConverterMethod;
    private transient Method valueConverterMethod;
    private final SeaTunnelRowType seaTunnelRowType;
    private final JsonToRowConverters.JsonToObjectConverter runtimeConverter;
    private final boolean keySchemaEnable;
    private final boolean valueSchemaEnable;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final CatalogTable catalogTable;

    public CompatibleKafkaConnectDeserializationSchema(@NonNull CatalogTable catalogTable, boolean keySchemaEnable, boolean valueSchemaEnable, boolean failOnMissingField, boolean ignoreParseErrors) {
        if (catalogTable == null) {
            throw new NullPointerException("catalogTable is marked non-null but is null");
        }
        this.catalogTable = catalogTable;
        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
        this.keySchemaEnable = keySchemaEnable;
        this.valueSchemaEnable = valueSchemaEnable;
        this.runtimeConverter = new JsonToRowConverters(failOnMissingField, ignoreParseErrors).createRowConverter((SeaTunnelRowType)Preconditions.checkNotNull((Object)this.seaTunnelRowType));
    }

    public SeaTunnelRow deserialize(byte[] message) throws IOException {
        throw new UnsupportedOperationException("Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead.");
    }

    public void deserialize(ConsumerRecord<byte[], byte[]> msg, Collector<SeaTunnelRow> out) throws InvocationTargetException, IllegalAccessException {
        this.tryInitConverter();
        if (msg == null) {
            return;
        }
        SinkRecord record = this.convertToSinkRecord(msg);
        RowKind rowKind = RowKind.INSERT;
        com.fasterxml.jackson.databind.JsonNode jsonNode = (com.fasterxml.jackson.databind.JsonNode)this.valueConverterMethod.invoke((Object)this.valueConverter, record.valueSchema(), record.value());
        com.fasterxml.jackson.databind.JsonNode payload = jsonNode.get(KAFKA_CONNECT_SINK_RECORD_PAYLOAD);
        Optional<TablePath> tablePath = Optional.ofNullable(this.catalogTable).map(CatalogTable::getTablePath);
        if (payload.isArray()) {
            ArrayNode arrayNode = (ArrayNode)payload;
            for (int i = 0; i < arrayNode.size(); ++i) {
                SeaTunnelRow row = this.convertJsonNode(arrayNode.get(i));
                row.setRowKind(rowKind);
                if (tablePath.isPresent()) {
                    row.setTableId(tablePath.toString());
                }
                out.collect((Object)row);
            }
        } else {
            SeaTunnelRow row = this.convertJsonNode(payload);
            row.setRowKind(rowKind);
            if (tablePath.isPresent()) {
                row.setTableId(tablePath.toString());
            }
            out.collect((Object)row);
        }
    }

    private SeaTunnelRow convertJsonNode(com.fasterxml.jackson.databind.JsonNode jsonNode) {
        if (jsonNode.isNull()) {
            return null;
        }
        try {
            JsonNode jsonData = JsonUtils.stringToJsonNode((String)jsonNode.toString());
            return (SeaTunnelRow)this.runtimeConverter.convert(jsonData, null);
        }
        catch (Throwable t) {
            throw CommonError.jsonOperationError((String)FORMAT, (String)jsonNode.toString(), (Throwable)t);
        }
    }

    private SinkRecord convertToSinkRecord(ConsumerRecord<byte[], byte[]> msg) {
        SchemaAndValue keyAndSchema = msg.key() == null ? SchemaAndValue.NULL : this.keyConverter.toConnectData(msg.topic(), msg.headers(), (byte[])msg.key());
        SchemaAndValue valueAndSchema = this.valueConverter.toConnectData(msg.topic(), msg.headers(), (byte[])msg.value());
        return new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset(), Long.valueOf(msg.timestamp()), msg.timestampType(), null);
    }

    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.seaTunnelRowType;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryInitConverter() {
        CompatibleKafkaConnectDeserializationSchema compatibleKafkaConnectDeserializationSchema;
        if (this.keyConverter == null) {
            compatibleKafkaConnectDeserializationSchema = this;
            synchronized (compatibleKafkaConnectDeserializationSchema) {
                if (this.keyConverter == null) {
                    this.keyConverter = new JsonConverter();
                    this.keyConverter.configure(Collections.singletonMap("schemas.enable", this.keySchemaEnable), true);
                    this.keyConverterMethod = (Method)ReflectionUtils.getDeclaredMethod(JsonConverter.class, (String)(this.keySchemaEnable ? INCLUDE_SCHEMA_METHOD : EXCLUDE_SCHEMA_METHOD), (Class[])new Class[]{Schema.class, Object.class}).get();
                }
            }
        }
        if (this.valueConverter == null) {
            compatibleKafkaConnectDeserializationSchema = this;
            synchronized (compatibleKafkaConnectDeserializationSchema) {
                if (this.valueConverter == null) {
                    this.valueConverter = new JsonConverter();
                    this.valueConverter.configure(Collections.singletonMap("schemas.enable", this.valueSchemaEnable), false);
                    this.valueConverterMethod = (Method)ReflectionUtils.getDeclaredMethod(JsonConverter.class, (String)(this.valueSchemaEnable ? INCLUDE_SCHEMA_METHOD : EXCLUDE_SCHEMA_METHOD), (Class[])new Class[]{Schema.class, Object.class}).get();
                }
            }
        }
    }

    public CompatibleKafkaConnectDeserializationSchema(SeaTunnelRowType seaTunnelRowType, JsonToRowConverters.JsonToObjectConverter runtimeConverter, boolean keySchemaEnable, boolean valueSchemaEnable, CatalogTable catalogTable) {
        this.seaTunnelRowType = seaTunnelRowType;
        this.runtimeConverter = runtimeConverter;
        this.keySchemaEnable = keySchemaEnable;
        this.valueSchemaEnable = valueSchemaEnable;
        this.catalogTable = catalogTable;
    }
}

