/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.kafka.catalog;

import com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase;
import com.alibaba.ververica.connectors.kafka.catalog.KafkaConnectorType;
import com.alibaba.ververica.connectors.kafka.catalog.aliyun.AliyunKafkaClient;
import com.alibaba.ververica.connectors.kafka.catalog.aliyun.AliyunKafkaClientParams;
import com.alibaba.ververica.connectors.kafka.catalog.factory.KafkaCatalogOptions;
import com.alibaba.ververica.connectors.kafka.catalog.schema.JsonSchemaParser;
import com.alibaba.ververica.connectors.kafka.catalog.schema.KafkaSchema;
import com.alibaba.ververica.connectors.kafka.utils.KafkaSchemaUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.Config;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaFuture;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Node;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.config.ConfigResource;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.WakeupException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableProvider;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaJsonCatalog
extends KafkaCatalogBase
implements CatalogTableProvider {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonCatalog.class);
    private static final long POLL_TIMEOUT = 10000L;
    private static final int RETRY_LIMIT = 10;
    private static final String NUM_PARTITIONS_PROP = "num.partitions";
    private static final String REPLICATION_FACTOR_PROP = "default.replication.factor";
    private final int maxFetchRecords;
    private final boolean compactedTopicAsUpsertTable;
    private final boolean flattenNestedColumns;
    private final boolean primitiveAsString;
    private final JsonSchemaParser jsonSchemaResolver;
    private final String parseKeyErrorFieldName;
    @Nullable
    private final AliyunKafkaClientParams aliyunKafkaClientParams;
    protected int numPartitions = 1;
    protected short replicationFactor = 1;
    protected KafkaConsumer<byte[], byte[]> consumer;
    protected AliyunKafkaClient aliyunKafkaClient;

    public KafkaJsonCatalog(String catalogName, String defaultDatabase, String bootstrapServers, String groupId, String format, String keyPrefix, String valuePrefix, int maxFetchRecords, boolean compactedTopicAsUpsertTable, Properties catalogProperties, boolean flattenNestedColumns, boolean primitiveAsString, TimestampFormat timestampFormat, String parseKeyErrorFieldName, @Nullable AliyunKafkaClientParams aliyunKafkaClientParams) {
        super(catalogName, defaultDatabase, bootstrapServers, groupId, format, keyPrefix, valuePrefix, catalogProperties);
        this.maxFetchRecords = maxFetchRecords;
        this.compactedTopicAsUpsertTable = compactedTopicAsUpsertTable;
        this.flattenNestedColumns = flattenNestedColumns;
        this.primitiveAsString = primitiveAsString;
        this.parseKeyErrorFieldName = parseKeyErrorFieldName;
        this.jsonSchemaResolver = new JsonSchemaParser(keyPrefix, valuePrefix, flattenNestedColumns, primitiveAsString, timestampFormat, parseKeyErrorFieldName);
        this.aliyunKafkaClientParams = aliyunKafkaClientParams;
    }

    @Override
    public void open() throws CatalogException {
        super.open();
        this.consumer = this.getKafkaConsumer();
        if (this.isAliyunKafka()) {
            this.aliyunKafkaClient = new AliyunKafkaClient(this.aliyunKafkaClientParams);
            this.aliyunKafkaClient.open();
        } else {
            this.inferDefaultTopicProps();
        }
    }

    @Override
    public void close() throws CatalogException {
        super.close();
        if (this.consumer != null) {
            this.closeResource(this.consumer);
        }
        if (this.aliyunKafkaClient != null) {
            try {
                boolean consumerGroupDeleted = false;
                for (int i = 0; i < 3 && !consumerGroupDeleted; ++i) {
                    consumerGroupDeleted = this.aliyunKafkaClient.deleteConsumerGroup(this.groupId, ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                }
                if (!consumerGroupDeleted) {
                    LOG.warn("Fail to delete the consumer group {}, skip deleting.", (Object)this.groupId);
                }
                this.aliyunKafkaClient.close();
            }
            catch (Exception e) {
                new CatalogException("Fail to close aliyun kafka client.", (Throwable)e);
            }
        }
    }

    @Override
    public void createDatabase(String name, CatalogDatabase catalogDatabase, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
        if (ignoreIfExists && this.getDefaultDatabase().equals(name)) {
            return;
        }
        throw new UnsupportedOperationException();
    }

    @Override
    public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        String databaseName = objectPath.getDatabaseName();
        if (!this.databaseExists(databaseName)) {
            throw new DatabaseNotExistException(this.getName(), databaseName);
        }
        if (this.tableExists(objectPath)) {
            if (ignoreIfExists) {
                return;
            }
            throw new TableAlreadyExistException(this.getName(), objectPath);
        }
        String topicName = objectPath.getObjectName();
        Optional pk = catalogBaseTable.getUnresolvedSchema().getPrimaryKey();
        boolean compactTopic = pk.isPresent() && this.compactedTopicAsUpsertTable;
        int tries = 0;
        int createRetryTimes = 5;
        while (tries++ < createRetryTimes) {
            try {
                if (this.isAliyunKafka()) {
                    this.aliyunKafkaClient.createTopic(topicName, compactTopic, ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                } else {
                    NewTopic newTopic = new NewTopic(topicName, this.numPartitions, this.replicationFactor);
                    if (compactTopic) {
                        HashMap<String, String> newTopicConfig = new HashMap<String, String>();
                        newTopicConfig.put("cleanup.policy", "compact");
                        newTopic = newTopic.configs(newTopicConfig);
                    }
                    CreateTopicsResult result = this.adminClient.createTopics(Collections.singleton(newTopic));
                    KafkaFuture<Void> future = result.values().get(topicName);
                    future.get(ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                }
                return;
            }
            catch (Throwable t) {
                if (tries < createRetryTimes) {
                    LOG.warn(String.format("Fail to create topic %s at the %d time.", topicName, tries), t);
                    try {
                        Thread.sleep(this.isAliyunKafka() ? 1000L : 500L);
                    }
                    catch (InterruptedException interruptedException) {}
                    continue;
                }
                throw new CatalogException(String.format("Fail to create topic [%s] in kafka json catalog [%s].", topicName, this.getName()), t);
            }
        }
        throw new CatalogException(String.format("Fail to create topic [%s] in kafka json catalog [%s].", topicName, this.getName()));
    }

    public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        if (!this.tableExists(objectPath)) {
            throw new TableNotExistException(this.getName(), objectPath);
        }
        String topicName = objectPath.getObjectName();
        KafkaSchema kafkaSchema = this.getKafkaSchemaByParsingRecord(topicName);
        Map<String, String> tableOptions = this.createTableProperties(topicName, kafkaSchema.getKeyFieldNames(), kafkaSchema.isUpsertKafka() ? KafkaConnectorType.UPSERT_KAFKA : KafkaConnectorType.KAFKA, kafkaSchema.getTreatKeyAsRaw());
        this.addJsonFormatOptions(tableOptions);
        return CatalogTable.of((Schema)kafkaSchema.getSchema(), null, Collections.emptyList(), tableOptions);
    }

    private boolean isCompactCleanupPolicyEnabled(String topic) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        DescribeConfigsResult topicDescResult = this.adminClient.describeConfigs(Collections.singleton(configResource));
        try {
            Map<ConfigResource, Config> topicDescMap = topicDescResult.all().get();
            ConfigEntry cleanupConfig = topicDescMap.get(configResource).get("cleanup.policy");
            String cleanupConfigString = cleanupConfig.value();
            return cleanupConfigString.contains("compact");
        }
        catch (Throwable t) {
            throw new CatalogException(String.format("Fail to get table %s in kafka json catalog [%s]", topic, this.getName()), t);
        }
    }

    private void addJsonFormatOptions(Map<String, String> tableOptions) {
        tableOptions.put(String.format("value.json.%s", JsonFormatOptions.INFER_SCHEMA_PRIMITIVE_AS_STRING.key()), String.valueOf(this.primitiveAsString));
        tableOptions.put(String.format("value.json.%s", JsonFormatOptions.INFER_SCHEMA_FLATTEN_NECOLUMNS_ENABLE.key()), String.valueOf(this.flattenNestedColumns));
    }

    private KafkaSchema getKafkaSchemaByParsingRecord(String topic) {
        try {
            List<TopicPartition> partitions = this.getTopicPartitions(topic);
            Map<TopicPartition, Long> beginningOffsets = this.consumer.beginningOffsets(partitions);
            Map<TopicPartition, Long> endOffsets = this.consumer.endOffsets(partitions);
            long expectedFetchSize = this.getExpectedFetchSize(beginningOffsets, endOffsets);
            Map<TopicPartition, Long> startOffsets = this.getStartOffsets(beginningOffsets, endOffsets, expectedFetchSize);
            this.consumer.assign(new ArrayList<TopicPartition>(startOffsets.keySet()));
            startOffsets.forEach(this.consumer::seek);
            KafkaSchema result = new KafkaSchema(Schema.newBuilder().build(), new LinkedHashSet<String>(), false, false);
            int retryTimes = 0;
            int receivedRecordNum = 0;
            while (retryTimes++ < 10 && (long)receivedRecordNum < expectedFetchSize) {
                try {
                    ConsumerRecords<byte[], byte[]> consumerRecords = this.consumer.poll(Duration.ofMillis(10000L));
                    receivedRecordNum += consumerRecords.count();
                    for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
                        Optional<Object> toMerge = Optional.empty();
                        try {
                            toMerge = this.jsonSchemaResolver.parseRecordSchema(consumerRecord);
                        }
                        catch (Exception e) {
                            LOG.warn(String.format("Fail to parse schema from %s.", consumerRecord.toString()));
                        }
                        if (!toMerge.isPresent()) continue;
                        KafkaSchema toMergeSchema = (KafkaSchema)toMerge.get();
                        result = this.mergeKafkaSchema(result, toMergeSchema);
                    }
                }
                catch (WakeupException we) {
                    LOG.warn(String.format("Fail to get records of topic [%s] in the retry time %d.", topic, retryTimes));
                }
            }
            if (receivedRecordNum < this.maxFetchRecords) {
                LOG.warn(String.format("Except to get %d records of topic [%s], actual get %d records.", this.maxFetchRecords, topic, receivedRecordNum));
            }
            boolean isUpsertKafkaTable = this.isUpsertKafkaTable(topic, result.getKeyFieldNames());
            String[] primaryKeys = this.getPrimaryKeys(isUpsertKafkaTable, result.getKeyFieldNames());
            Schema schema = KafkaSchemaUtils.generateKafkaSchemaForCatalog(result.getSchema(), result.getKeyFieldNames(), primaryKeys);
            return new KafkaSchema(schema, result.getKeyFieldNames(), result.getTreatKeyAsRaw(), isUpsertKafkaTable);
        }
        catch (Exception e) {
            throw new CatalogException(String.format("Failed getting table %s by parsing the json record.", topic), (Throwable)e);
        }
    }

    private boolean isUpsertKafkaTable(String topicName, LinkedHashSet<String> keyFieldNames) {
        return this.compactedTopicAsUpsertTable && !keyFieldNames.isEmpty() && this.isCompactCleanupPolicyEnabled(topicName);
    }

    private String[] getPrimaryKeys(boolean isUpsertKafkaTable, LinkedHashSet<String> keyFieldNames) {
        if (isUpsertKafkaTable) {
            return (String[])keyFieldNames.stream().toArray(String[]::new);
        }
        return PK;
    }

    private KafkaSchema mergeKafkaSchema(KafkaSchema resultSchema, KafkaSchema toMergeSchema) {
        String fullKeyFieldNameWhenParseError = this.jsonSchemaResolver.getFullKeyFieldNameWhenParseError();
        KafkaSchemaUtils.checkFieldConflict(toMergeSchema.getSchema());
        if (resultSchema.getTreatKeyAsRaw()) {
            toMergeSchema = new KafkaSchema(this.modifyKeyFieldsWhenTreatKeyAsRaw(toMergeSchema.getSchema(), toMergeSchema.getKeyFieldNames(), fullKeyFieldNameWhenParseError), toMergeSchema.getKeyFieldNames(), toMergeSchema.getTreatKeyAsRaw(), toMergeSchema.isUpsertKafka());
        } else if (toMergeSchema.getTreatKeyAsRaw()) {
            resultSchema = new KafkaSchema(this.modifyKeyFieldsWhenTreatKeyAsRaw(resultSchema.getSchema(), resultSchema.getKeyFieldNames(), fullKeyFieldNameWhenParseError), new LinkedHashSet<String>(Collections.singletonList(fullKeyFieldNameWhenParseError)), true, resultSchema.isUpsertKafka());
        } else {
            resultSchema.getKeyFieldNames().addAll(toMergeSchema.getKeyFieldNames());
        }
        return new KafkaSchema(KafkaSchemaUtils.mergeSchema(resultSchema.getSchema(), toMergeSchema.getSchema()), resultSchema.getKeyFieldNames(), resultSchema.getTreatKeyAsRaw(), resultSchema.isUpsertKafka());
    }

    private Schema modifyKeyFieldsWhenTreatKeyAsRaw(Schema schema, LinkedHashSet<String> keyFieldNames, String parseKeyErrorFieldName) {
        Schema.Builder builder = Schema.newBuilder();
        builder.fromColumns(schema.getColumns().stream().filter(col -> !keyFieldNames.contains(col.getName())).collect(Collectors.toList()));
        builder.column(parseKeyErrorFieldName, (AbstractDataType)DataTypes.BYTES());
        return builder.build();
    }

    @VisibleForTesting
    Map<TopicPartition, Long> getStartOffsets(Map<TopicPartition, Long> beginningOffsets, Map<TopicPartition, Long> endOffsets, long expectedFetchSize) {
        HashMap<TopicPartition, Long> fetchStartOffsets = new HashMap<TopicPartition, Long>(endOffsets);
        long restRecordNum = expectedFetchSize;
        int partitionNum = beginningOffsets.size();
        ArrayList<TopicPartition> partitionOrderByMessageCount = new ArrayList<TopicPartition>(beginningOffsets.keySet());
        partitionOrderByMessageCount.sort((tp0, tp1) -> (int)((Long)endOffsets.get(tp0) - (Long)beginningOffsets.get(tp0) - ((Long)endOffsets.get(tp1) - (Long)beginningOffsets.get(tp1))));
        for (TopicPartition tp : partitionOrderByMessageCount) {
            Long partitionMessageCount = endOffsets.get(tp) - beginningOffsets.get(tp);
            long expectedMessageCount = (restRecordNum + (long)partitionNum - 1L) / (long)partitionNum;
            long actualMessageCount = Math.min(partitionMessageCount, expectedMessageCount);
            fetchStartOffsets.put(tp, endOffsets.get(tp) - actualMessageCount);
            restRecordNum -= actualMessageCount;
            --partitionNum;
        }
        return fetchStartOffsets;
    }

    @VisibleForTesting
    long getExpectedFetchSize(Map<TopicPartition, Long> beginningOffsets, Map<TopicPartition, Long> endOffsets) {
        long rest = this.maxFetchRecords;
        for (TopicPartition tp : endOffsets.keySet()) {
            long partitionRecordSize = endOffsets.get(tp) - beginningOffsets.get(tp);
            if ((rest -= partitionRecordSize) > 0L) continue;
            return this.maxFetchRecords;
        }
        return (long)this.maxFetchRecords - rest;
    }

    private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
        Properties consumerProps = new Properties();
        this.deepCopyProperties(this.catalogProperties, consumerProps);
        consumerProps.setProperty("bootstrap.servers", this.bootstrapServers);
        consumerProps.setProperty("group.id", this.groupId);
        consumerProps.setProperty("client.id", this.groupId + "-catalog-consumer");
        consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.setProperty("allow.auto.create.topics", "false");
        consumerProps.setProperty("auto.offset.reset", "latest");
        return new KafkaConsumer<byte[], byte[]>(consumerProps);
    }

    private void inferDefaultTopicProps() {
        try {
            Collection<Node> brokers = this.adminClient.describeCluster().nodes().get(ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
            if (brokers != null && !brokers.isEmpty()) {
                Node brokerNode = brokers.stream().findAny().get();
                ConfigResource oneBroker = new ConfigResource(ConfigResource.Type.BROKER, brokerNode.idString());
                Config brokerConfig = this.adminClient.describeConfigs(Arrays.asList(oneBroker)).all().get(ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS).get(oneBroker);
                if (brokerConfig.get(NUM_PARTITIONS_PROP) != null) {
                    this.numPartitions = Integer.parseInt(brokerConfig.get(NUM_PARTITIONS_PROP).value());
                }
                if (brokerConfig.get(REPLICATION_FACTOR_PROP) != null) {
                    this.replicationFactor = Short.parseShort(brokerConfig.get(REPLICATION_FACTOR_PROP).value());
                }
            } else {
                throw new IllegalArgumentException("The number of available brokers is 0.");
            }
            LOG.info(String.format("The inferred partition number is %d, replication factor is %d for kafka cluster %s.", this.numPartitions, this.replicationFactor, this.getDefaultDatabase()));
        }
        catch (Throwable t) {
            throw new CatalogException("Fail to get the broker config.", t);
        }
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof KafkaJsonCatalog)) {
            return false;
        }
        KafkaJsonCatalog that = (KafkaJsonCatalog)((Object)o);
        return super.equals(o) && this.maxFetchRecords == that.maxFetchRecords && this.flattenNestedColumns == that.flattenNestedColumns && this.primitiveAsString == that.primitiveAsString && Objects.equals(this.parseKeyErrorFieldName, that.parseKeyErrorFieldName) && this.numPartitions == that.numPartitions && this.replicationFactor == that.replicationFactor && Objects.equals(this.aliyunKafkaClientParams, that.aliyunKafkaClientParams);
    }

    @Override
    public int hashCode() {
        return Objects.hash(super.hashCode(), this.maxFetchRecords, this.flattenNestedColumns, this.primitiveAsString, this.parseKeyErrorFieldName, this.numPartitions, this.replicationFactor, this.aliyunKafkaClientParams);
    }

    public CatalogTable inferTable(ObjectPath objectPath, CatalogDatabase catalogDatabase, CatalogTable baseTable) throws CatalogException {
        String connector = (String)baseTable.getOptions().get(FactoryUtil.CONNECTOR.key());
        if (connector != null && !"upsert-kafka".equals(connector)) {
            throw new UnsupportedOperationException(String.format("Kafka json catalog only supports 'upsert-kafka' table as sink in CTAS, current connector identifier is '%s'.", connector));
        }
        Optional pkOptional = baseTable.getUnresolvedSchema().getPrimaryKey();
        if (!pkOptional.isPresent()) {
            throw new UnsupportedOperationException(String.format("The upsert kafka table [%s] must have primary keys, but actual is empty.", objectPath.getFullName()));
        }
        Map<String, String> tableOptions = this.createTableProperties(objectPath.getObjectName(), new LinkedHashSet<String>(((Schema.UnresolvedPrimaryKey)pkOptional.get()).getColumnNames().stream().map(s -> this.keyPrefix + s).collect(Collectors.toList())), KafkaConnectorType.UPSERT_KAFKA);
        this.addJsonFormatOptions(tableOptions);
        HashMap<String, String> mergedOptions = new HashMap<String, String>(tableOptions);
        mergedOptions.putAll(catalogDatabase.getProperties().entrySet().stream().filter(e -> !((String)e.getKey()).startsWith("cdas.")).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
        mergedOptions.putAll(baseTable.getOptions());
        return CatalogTable.of((Schema)this.inferTableSchema(baseTable.getUnresolvedSchema()), (String)baseTable.getComment(), Collections.emptyList(), mergedOptions);
    }

    public ObjectIdentifier inferTableIdentifier(ObjectIdentifier tableIdentifier, CatalogDatabase baseDatabase, CatalogTable baseTable) {
        Map databaseOptions = baseDatabase.getProperties();
        if (CollectionUtil.isNullOrEmpty((Map)databaseOptions)) {
            return tableIdentifier;
        }
        String topicPattern = (String)databaseOptions.get(KafkaCatalogOptions.CDAS_TOPIC_PATTERN.key());
        if (!StringUtils.isNullOrWhitespaceOnly((String)topicPattern)) {
            tableIdentifier = ObjectIdentifier.of((String)tableIdentifier.getCatalogName(), (String)tableIdentifier.getDatabaseName(), (String)topicPattern.replace("{table-name}", tableIdentifier.getObjectName()));
        }
        return tableIdentifier;
    }

    private Schema inferTableSchema(Schema originSchema) {
        LinkedHashSet pkNames = new LinkedHashSet(originSchema.getPrimaryKey().map(Schema.UnresolvedPrimaryKey::getColumnNames).orElse(Collections.emptyList()));
        Schema.Builder builder = Schema.newBuilder();
        originSchema.getColumns().forEach(col -> {
            if (!(col instanceof Schema.UnresolvedPhysicalColumn)) {
                builder.fromColumns(Collections.singletonList(col));
                return;
            }
            Schema.UnresolvedPhysicalColumn phyCol = (Schema.UnresolvedPhysicalColumn)col;
            builder.column((pkNames.contains(col.getName()) ? this.keyPrefix : this.valuePrefix) + phyCol.getName(), phyCol.getDataType());
        });
        builder.primaryKey(pkNames.stream().map(s -> this.keyPrefix + s).collect(Collectors.toList()));
        return builder.build();
    }

    private boolean isAliyunKafka() {
        return this.aliyunKafkaClientParams != null;
    }

    @VisibleForTesting
    public int getNumPartitions() {
        return this.numPartitions;
    }

    @VisibleForTesting
    public short getReplicationFactor() {
        return this.replicationFactor;
    }

    @VisibleForTesting
    public AliyunKafkaClient getAliyunKafkaClient() {
        return this.aliyunKafkaClient;
    }
}

