/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.kafka.catalog.schema;

import com.alibaba.ververica.connectors.kafka.catalog.schema.KafkaSchema;
import com.alibaba.ververica.connectors.kafka.catalog.schema.RecordSchemaParser;
import java.time.ZoneId;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonToRowDataInSourceRecordConverters;
import org.apache.flink.formats.json.JsonToSourceRecordConverter;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.SchemaSpec;
import org.apache.flink.table.data.SourceRecord;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonSchemaParser
implements RecordSchemaParser {
    private static final Logger LOG = LoggerFactory.getLogger(JsonSchemaParser.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final String keyPrefix;
    private final String valuePrefix;
    private final String parseKeyErrorFullFieldName;
    private final JsonToSourceRecordConverter valueJsonConverter;
    private final JsonToSourceRecordConverter keyJsonConverter;

    public JsonSchemaParser(String keyPrefix, String valuePrefix, boolean flattenNestedColumns, boolean primitiveAsString, TimestampFormat timestampFormat, String parseKeyErrorFieldName) {
        this.keyPrefix = keyPrefix;
        this.valuePrefix = valuePrefix;
        this.parseKeyErrorFullFieldName = keyPrefix + parseKeyErrorFieldName;
        this.objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
        this.valueJsonConverter = new JsonToSourceRecordConverter(new ObjectPath("test", "test"), new RowType(Collections.emptyList()), new JsonToRowDataInSourceRecordConverters(false, true, timestampFormat, ZoneId.systemDefault()), false, flattenNestedColumns, primitiveAsString);
        this.keyJsonConverter = new JsonToSourceRecordConverter(new ObjectPath("test", "test"), new RowType(Collections.emptyList()), new JsonToRowDataInSourceRecordConverters(false, true, timestampFormat, ZoneId.systemDefault()), false, false, false);
    }

    @Override
    public Optional<KafkaSchema> parseRecordSchema(ConsumerRecord<byte[], byte[]> record) {
        boolean treatKeyAsRaw;
        Schema.Builder builder = Schema.newBuilder();
        Optional<Schema> keySchemaOptional = this.parseSchemaAndAddPrefix(record.key(), this.keyPrefix, this.keyJsonConverter);
        boolean bl = treatKeyAsRaw = !keySchemaOptional.isPresent();
        if (treatKeyAsRaw) {
            LOG.warn(String.format("Fail to parse the key of the record in topic '%s' [partition %d , offset %d], it will be treated as raw format.", record.topic(), record.partition(), record.offset()));
        }
        Schema keySchema = keySchemaOptional.orElseGet(() -> {
            Schema.Builder keyBuilder = Schema.newBuilder();
            keyBuilder.column(this.getFullKeyFieldNameWhenParseError(), (AbstractDataType)DataTypes.BYTES());
            return keyBuilder.build();
        });
        builder.fromSchema(keySchema);
        LinkedHashSet<String> keyFieldNames = new LinkedHashSet<String>();
        keySchema.getColumns().stream().map(Schema.UnresolvedColumn::getName).forEach(keyFieldNames::add);
        builder.fromSchema(this.parseSchemaAndAddPrefix(record.value(), this.valuePrefix, this.valueJsonConverter).orElseGet(() -> Schema.newBuilder().build()));
        return Optional.of(new KafkaSchema(builder.build(), keyFieldNames, treatKeyAsRaw, false));
    }

    public String getFullKeyFieldNameWhenParseError() {
        return this.parseKeyErrorFullFieldName;
    }

    private Optional<Schema> parseSchemaAndAddPrefix(byte[] jsonMessage, String prefix, JsonToSourceRecordConverter converter) {
        Optional<Schema> optional;
        block10: {
            if (jsonMessage == null || jsonMessage.length == 0) {
                return Optional.of(Schema.newBuilder().build());
            }
            JsonParser root = this.objectMapper.getFactory().createParser(jsonMessage);
            try {
                if (root.currentToken() == null) {
                    root.nextToken();
                }
                SourceRecord record = converter.convert(root);
                SchemaSpec schemaSpec = record.getSchema();
                optional = Optional.of(Schema.newBuilder().fromFields(this.appendPrefixToFieldNames(schemaSpec.getColumnNames(), prefix), schemaSpec.getColumnDataTypes()).build());
                if (root == null) break block10;
            }
            catch (Throwable throwable) {
                try {
                    if (root != null) {
                        try {
                            root.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Throwable t) {
                    LOG.debug(String.format("Failed to deserialize JSON '%s'.", new String(jsonMessage)), t);
                    LOG.error("Failed to deserialize JSON message.", t);
                    return Optional.empty();
                }
            }
            root.close();
        }
        return optional;
    }

    private List<String> appendPrefixToFieldNames(List<String> fieldNames, String prefix) {
        if (StringUtils.isNullOrWhitespaceOnly((String)prefix)) {
            return fieldNames;
        }
        return fieldNames.stream().map(name -> prefix + name).collect(Collectors.toList());
    }
}

