/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.kafka.catalog.factory;

import com.alibaba.ververica.connectors.kafka.catalog.KafkaJsonCatalog;
import com.alibaba.ververica.connectors.kafka.catalog.KafkaSchemaRegistryCatalog;
import com.alibaba.ververica.connectors.kafka.catalog.aliyun.AliyunKafkaClientParams;
import com.alibaba.ververica.connectors.kafka.catalog.factory.KafkaCatalogOptions;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonFormatOptionsUtil;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;

public class KafkaCatalogFactory
implements CatalogFactory {
    public String factoryIdentifier() {
        return "kafka";
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet options = new HashSet();
        options.add(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS);
        options.add(FactoryUtil.FORMAT);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet options = new HashSet();
        options.add(AvroConfluentFormatOptions.URL);
        options.add(KafkaCatalogOptions.SCHEMA_CAPACITY);
        options.add(KafkaCatalogOptions.DEFAULT_DATABASE);
        options.add(KafkaCatalogOptions.MAX_FETCH_RECORDS);
        options.add(KafkaCatalogOptions.COMPACTED_TOPIC_AS_UPSERT_TABLE);
        options.add(KafkaConnectorOptions.KEY_FIELDS_PREFIX);
        options.add(KafkaConnectorOptions.VALUE_FIELDS_PREFIX);
        options.add(KafkaCatalogOptions.PARSE_KEY_ERROR_FIELD_NAME);
        options.add(JsonFormatOptions.TIMESTAMP_FORMAT);
        options.add(JsonFormatOptions.INFER_SCHEMA_FLATTEN_NECOLUMNS_ENABLE);
        options.add(JsonFormatOptions.INFER_SCHEMA_PRIMITIVE_AS_STRING);
        options.add(KafkaCatalogOptions.ALIYUN_KAFKA_AK);
        options.add(KafkaCatalogOptions.ALIYUN_KAFKA_SK);
        options.add(KafkaCatalogOptions.ALIYUN_KAFKA_INSTANCE_ID);
        options.add(KafkaCatalogOptions.ALIYUN_KAFKA_ENDPOINT);
        options.add(KafkaCatalogOptions.ALIYUN_KAFKA_REGION_ID);
        return options;
    }

    public Catalog createCatalog(CatalogFactory.Context context) {
        FactoryUtil.CatalogFactoryHelper helper = FactoryUtil.createCatalogFactoryHelper((CatalogFactory)this, (CatalogFactory.Context)context);
        helper.validateExcept(new String[]{"properties."});
        ReadableConfig catalogOptions = helper.getOptions();
        this.checkOptions(catalogOptions);
        String formatIdentifier = (String)catalogOptions.get(FactoryUtil.FORMAT);
        TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat((ReadableConfig)catalogOptions);
        String schemaRegistryUrl = catalogOptions.getOptional(AvroConfluentFormatOptions.URL).orElse(null);
        AliyunKafkaClientParams aliyunKafkaClientParams = AliyunKafkaClientParams.createAliyunKafkaClientParams(catalogOptions);
        if (schemaRegistryUrl == null) {
            return new KafkaJsonCatalog(context.getName(), (String)catalogOptions.get(KafkaCatalogOptions.DEFAULT_DATABASE), (String)catalogOptions.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS), "catalogReader" + ThreadLocalRandom.current().nextLong(), formatIdentifier, catalogOptions.getOptional(KafkaConnectorOptions.KEY_FIELDS_PREFIX).orElse("key_"), catalogOptions.getOptional(KafkaConnectorOptions.VALUE_FIELDS_PREFIX).orElse("value_"), (Integer)catalogOptions.get(KafkaCatalogOptions.MAX_FETCH_RECORDS), (Boolean)catalogOptions.get(KafkaCatalogOptions.COMPACTED_TOPIC_AS_UPSERT_TABLE), KafkaCatalogOptions.getOtherProperties(context.getOptions(), "properties."), (Boolean)catalogOptions.get(JsonFormatOptions.INFER_SCHEMA_FLATTEN_NECOLUMNS_ENABLE), (Boolean)catalogOptions.get(JsonFormatOptions.INFER_SCHEMA_PRIMITIVE_AS_STRING), timestampOption, (String)catalogOptions.get(KafkaCatalogOptions.PARSE_KEY_ERROR_FIELD_NAME), aliyunKafkaClientParams);
        }
        return new KafkaSchemaRegistryCatalog(context.getName(), (String)catalogOptions.get(KafkaCatalogOptions.DEFAULT_DATABASE), (String)catalogOptions.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS), schemaRegistryUrl, (Integer)catalogOptions.get(KafkaCatalogOptions.SCHEMA_CAPACITY), "catalogReader" + ThreadLocalRandom.current().nextLong(), formatIdentifier, catalogOptions.getOptional(KafkaConnectorOptions.KEY_FIELDS_PREFIX).orElse("key_"), catalogOptions.getOptional(KafkaConnectorOptions.VALUE_FIELDS_PREFIX).orElse("value_"), KafkaCatalogOptions.getOtherProperties(context.getOptions(), "properties."));
    }

    private void checkOptions(ReadableConfig catalogOptions) {
        String schemaRegistryUrl = catalogOptions.getOptional(AvroConfluentFormatOptions.URL).orElse(null);
        String formatIdentifier = (String)catalogOptions.get(FactoryUtil.FORMAT);
        if (schemaRegistryUrl == null) {
            Preconditions.checkState((boolean)"json".equals(formatIdentifier), (Object)String.format("Now only support parsing %s format to get schema, actual is %s.", "json", formatIdentifier));
        } else {
            Preconditions.checkState((boolean)"avro-confluent".equals(formatIdentifier), (Object)String.format("Catalog using schema registry now only support %s format, actual is %s.", "avro-confluent", formatIdentifier));
        }
    }
}

