/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.kafka.catalog;

import com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase;
import com.alibaba.ververica.connectors.kafka.catalog.KafkaConnectorType;
import com.alibaba.ververica.connectors.kafka.catalog.schema.KafkaSchema;
import com.alibaba.ververica.connectors.kafka.catalog.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import com.alibaba.ververica.connectors.kafka.catalog.shaded.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import com.alibaba.ververica.connectors.kafka.catalog.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSchemaRegistryCatalog
extends KafkaCatalogBase {
    private static final String CONFLUENT_VALUE_SCHEMA_NAME_SUFFIX = "-value";
    private static final String CONFLUENT_KEY_SCHEMA_NAME_SUFFIX = "-key";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSchemaRegistryCatalog.class);
    private final String schemaRegistryUrl;
    private final SchemaRegistryClient schemaRegistryClient;
    @VisibleForTesting
    Map<String, String> kafkaProps;

    public KafkaSchemaRegistryCatalog(String catalogName, @Nullable String databaseName, String bootstrapServers, String schemaUrl, int schemaCapacity, String groupId, String format, String keyPrefix, String valuePrefix, Properties catalogProperties) {
        this(catalogName, databaseName, bootstrapServers, schemaUrl, new CachedSchemaRegistryClient(schemaUrl, schemaCapacity), groupId, format, keyPrefix, valuePrefix, catalogProperties);
    }

    @VisibleForTesting
    KafkaSchemaRegistryCatalog(String catalogName, @Nullable String databaseName, String bootstrapServers, String schemaUrl, SchemaRegistryClient client, String groupId, String format, String keyPrefix, String valuePrefix, Properties catalogProperties) {
        super(catalogName, databaseName == null ? "kafka" : databaseName, bootstrapServers, groupId, format, keyPrefix, valuePrefix, catalogProperties);
        Preconditions.checkArgument((!StringUtils.isNullOrWhitespaceOnly((String)schemaUrl) ? 1 : 0) != 0, (Object)"Schema Registry URL can not be null or empty.");
        this.schemaRegistryUrl = schemaUrl;
        this.schemaRegistryClient = client;
        this.kafkaProps = new HashMap<String, String>();
        LOG.info("Created SchemaRegistryCatalog '{}'", (Object)catalogName);
    }

    public Optional<Factory> getFactory() {
        return Optional.of(new KafkaDynamicTableFactory());
    }

    public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
        String topic = tablePath.getObjectName();
        if (!this.tableExists(tablePath)) {
            throw new TableNotExistException(this.getName(), tablePath);
        }
        KafkaSchema kafkaSchema = this.getKafkaSchemaBySchemaRegistry(topic);
        Map<String, String> props = this.createTableProperties(topic, kafkaSchema.getKeyFieldNames(), KafkaConnectorType.KAFKA);
        props.put(String.format("value.%s.%s", "avro-confluent", AvroConfluentFormatOptions.URL.key()), this.schemaRegistryUrl);
        if (!kafkaSchema.getKeyFieldNames().isEmpty()) {
            props.put(String.format("key.%s.%s", "avro-confluent", AvroConfluentFormatOptions.URL.key()), this.schemaRegistryUrl);
        }
        return CatalogTable.of((Schema)kafkaSchema.getSchema(), null, Collections.emptyList(), props);
    }

    private Schema getSchemaForAvroSchema(String subject, String prefix) throws RestClientException, IOException {
        String avroSchemaString = this.schemaRegistryClient.getLatestSchemaMetadata(subject).getSchema();
        DataType dataType = AvroSchemaConverter.convertToDataType((String)avroSchemaString);
        return this.fromRowDataType(dataType, prefix);
    }

    private KafkaSchema getKafkaSchemaBySchemaRegistry(String topic) {
        try {
            String valueTopic = topic + CONFLUENT_VALUE_SCHEMA_NAME_SUFFIX;
            String keyTopic = topic + CONFLUENT_KEY_SCHEMA_NAME_SUFFIX;
            LinkedHashSet<String> keyFieldNames = new LinkedHashSet<String>();
            Schema.Builder builder = Schema.newBuilder();
            Schema valueSchema = this.getSchemaForAvroSchema(valueTopic, this.valuePrefix);
            builder.fromColumns(valueSchema.getColumns());
            if (this.schemaRegistryClient.getAllSubjects().contains(keyTopic)) {
                Schema keySchema = this.getSchemaForAvroSchema(keyTopic, this.keyPrefix);
                keyFieldNames.addAll(keySchema.getColumns().stream().map(Schema.UnresolvedColumn::getName).collect(Collectors.toList()));
                builder.fromColumns(keySchema.getColumns());
            }
            return new KafkaSchema(this.appendKafkaMetadataAndPK(builder.build(), PK), keyFieldNames, false, false);
        }
        catch (RestClientException | IOException e) {
            throw new CatalogException(String.format("Failed getting table %s from schema registry %s", topic, this.schemaRegistryUrl), (Throwable)e);
        }
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof KafkaSchemaRegistryCatalog)) {
            return false;
        }
        KafkaSchemaRegistryCatalog that = (KafkaSchemaRegistryCatalog)((Object)o);
        return super.equals(o) && this.schemaRegistryUrl.equals(that.schemaRegistryUrl) && this.kafkaProps.equals(that.kafkaProps);
    }

    @Override
    public int hashCode() {
        return Objects.hash(this.catalogProperties, this.bootstrapServers, this.groupId, this.keyPrefix, this.valuePrefix, this.format, this.schemaRegistryUrl, this.kafkaProps);
    }
}

