/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.options.table.TableIdentifierOptions;
import org.apache.seatunnel.api.options.table.TableSchemaOptions;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.TableIdentifierConfig;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJsonFormatOptions;
import org.apache.seatunnel.format.compatible.kafka.connect.json.NativeKafkaConnectDeserializationSchema;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchemaDispatcher;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.json.maxwell.MaxWellJsonDeserializationSchema;
import org.apache.seatunnel.format.json.ogg.OggJsonDeserializationSchema;
import org.apache.seatunnel.format.protobuf.ProtobufDeserializationSchema;
import org.apache.seatunnel.format.text.TextDeserializationSchema;

public class KafkaSourceConfig
implements Serializable {
    private static final long serialVersionUID = 1L;
    private final String bootstrap;
    private final Map<TablePath, ConsumerMetadata> mapMetadata;
    private final boolean commitOnCheckpoint;
    private final Properties properties;
    private final long discoveryIntervalMillis;
    private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
    private final String consumerGroup;
    private final long pollTimeout;
    private final int readerCacheQueueSize;

    public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
        this.bootstrap = (String)readonlyConfig.get(KafkaSourceOptions.BOOTSTRAP_SERVERS);
        this.mapMetadata = this.createMapConsumerMetadata(readonlyConfig);
        this.commitOnCheckpoint = (Boolean)readonlyConfig.get(KafkaSourceOptions.COMMIT_ON_CHECKPOINT);
        this.properties = this.createKafkaProperties(readonlyConfig);
        this.discoveryIntervalMillis = (Long)readonlyConfig.get(KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
        this.messageFormatErrorHandleWay = (MessageFormatErrorHandleWay)((Object)readonlyConfig.get(KafkaSourceOptions.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION));
        this.pollTimeout = (Long)readonlyConfig.get(KafkaSourceOptions.KEY_POLL_TIMEOUT);
        this.consumerGroup = (String)readonlyConfig.get(KafkaSourceOptions.CONSUMER_GROUP);
        this.readerCacheQueueSize = (Integer)readonlyConfig.get(KafkaSourceOptions.READER_CACHE_QUEUE_SIZE);
    }

    private Properties createKafkaProperties(ReadonlyConfig readonlyConfig) {
        Properties resultProperties = new Properties();
        readonlyConfig.getOptional(KafkaSourceOptions.KAFKA_CONFIG).ifPresent(resultProperties::putAll);
        return resultProperties;
    }

    private Map<TablePath, ConsumerMetadata> createMapConsumerMetadata(ReadonlyConfig readonlyConfig) {
        List<Object> consumerMetadataList = readonlyConfig.getOptional(KafkaSourceOptions.TABLE_CONFIGS).isPresent() ? ((List)readonlyConfig.get(KafkaSourceOptions.TABLE_CONFIGS)).stream().map(ReadonlyConfig::fromMap).map(this::createConsumerMetadata).collect(Collectors.toList()) : (readonlyConfig.getOptional(KafkaSourceOptions.TABLE_LIST).isPresent() ? ((List)readonlyConfig.get(KafkaSourceOptions.TABLE_LIST)).stream().map(ReadonlyConfig::fromMap).map(this::createConsumerMetadata).collect(Collectors.toList()) : Collections.singletonList(this.createConsumerMetadata(readonlyConfig)));
        return consumerMetadataList.stream().collect(Collectors.toMap(consumerMetadata -> this.getTablePathFromSchema(readonlyConfig, consumerMetadata.getTopic()), consumerMetadata -> consumerMetadata));
    }

    private ConsumerMetadata createConsumerMetadata(ReadonlyConfig readonlyConfig) {
        ConsumerMetadata consumerMetadata = new ConsumerMetadata();
        consumerMetadata.setTopic((String)readonlyConfig.get(KafkaSourceOptions.TOPIC));
        consumerMetadata.setPattern((Boolean)readonlyConfig.get(KafkaSourceOptions.PATTERN));
        consumerMetadata.setProperties(new Properties());
        CatalogTable catalogTable = this.createCatalogTable(readonlyConfig);
        consumerMetadata.setCatalogTable(catalogTable);
        consumerMetadata.setDeserializationSchema(this.createDeserializationSchema(catalogTable, readonlyConfig));
        readonlyConfig.getOptional(KafkaSourceOptions.START_MODE).ifPresent(startMode -> {
            consumerMetadata.setStartMode((StartMode)((Object)startMode));
            switch (startMode) {
                case TIMESTAMP: {
                    long startOffsetsTimestamp = (Long)readonlyConfig.get(KafkaSourceOptions.START_MODE_TIMESTAMP);
                    long currentTimestamp = System.currentTimeMillis();
                    if (startOffsetsTimestamp < 0L || startOffsetsTimestamp > currentTimestamp) {
                        throw new IllegalArgumentException("start_mode.timestamp The value is smaller than 0 or smaller than the current time");
                    }
                    consumerMetadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
                    if (!Objects.nonNull(readonlyConfig.get(KafkaSourceOptions.START_MODE_END_TIMESTAMP))) break;
                    long endOffsetsTimestamp = (Long)readonlyConfig.get(KafkaSourceOptions.START_MODE_END_TIMESTAMP);
                    if (endOffsetsTimestamp < 0L || endOffsetsTimestamp > currentTimestamp) {
                        throw new IllegalArgumentException("start_mode.endTimestamp The value is smaller than 0 or smaller than the current time");
                    }
                    consumerMetadata.setEndOffsetsTimestamp(endOffsetsTimestamp);
                    break;
                }
                case SPECIFIC_OFFSETS: {
                    Map offsetMap = (Map)readonlyConfig.get(KafkaSourceOptions.START_MODE_OFFSETS);
                    if (MapUtils.isEmpty((Map)offsetMap)) {
                        throw new IllegalArgumentException("start mode is " + (Object)((Object)StartMode.SPECIFIC_OFFSETS) + "but no specific offsets were specified.");
                    }
                    HashMap<TopicPartition, Long> specificStartOffsets = new HashMap<TopicPartition, Long>();
                    offsetMap.forEach((topicPartitionKey, offset) -> {
                        int splitIndex = topicPartitionKey.lastIndexOf("-");
                        String topic = topicPartitionKey.substring(0, splitIndex);
                        String partition = topicPartitionKey.substring(splitIndex + 1);
                        TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partition));
                        specificStartOffsets.put(topicPartition, (Long)offset);
                    });
                    consumerMetadata.setSpecificStartOffsets(specificStartOffsets);
                    break;
                }
            }
        });
        return consumerMetadata;
    }

    private CatalogTable createCatalogTable(final ReadonlyConfig readonlyConfig) {
        Optional schemaOptions = readonlyConfig.getOptional(KafkaSourceOptions.SCHEMA);
        MessageFormat format = (MessageFormat)((Object)readonlyConfig.get(KafkaSourceOptions.FORMAT));
        TableSchema tableSchema = format == MessageFormat.NATIVE ? this.nativeTableSchema() : (schemaOptions.isPresent() ? new ReadonlyConfigParser().parse(readonlyConfig) : TableSchema.builder().column((Column)PhysicalColumn.of((String)"content", (SeaTunnelDataType)BasicType.STRING_TYPE, (Integer)0, (boolean)false, null, null)).build());
        TablePath tablePath = this.getTablePathFromSchema(readonlyConfig, (String)readonlyConfig.get(KafkaSourceOptions.TOPIC));
        return CatalogTable.of((TableIdentifier)TableIdentifier.of((String)"", (TablePath)tablePath), (TableSchema)tableSchema, (Map)new HashMap<String, String>(){
            {
                Optional.ofNullable(readonlyConfig.get(KafkaSourceOptions.PROTOBUF_MESSAGE_NAME)).ifPresent(value -> this.put(KafkaSourceOptions.PROTOBUF_MESSAGE_NAME.key(), value));
                Optional.ofNullable(readonlyConfig.get(KafkaSourceOptions.PROTOBUF_SCHEMA)).ifPresent(value -> this.put(KafkaSourceOptions.PROTOBUF_SCHEMA.key(), value));
            }
        }, Collections.emptyList(), null);
    }

    private TablePath getTablePathFromSchema(ReadonlyConfig readonlyConfig, String topicName) {
        ReadonlyConfig schema = readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).map(ReadonlyConfig::fromMap).orElse(ReadonlyConfig.fromMap(Collections.emptyMap()));
        return schema.getOptional(TableIdentifierOptions.TABLE).map(TablePath::of).orElseGet(() -> TablePath.of(null, (String)topicName));
    }

    private DeserializationSchema<SeaTunnelRow> createDeserializationSchema(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
        SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
        MessageFormat format = (MessageFormat)((Object)readonlyConfig.get(KafkaSourceOptions.FORMAT));
        if (format == MessageFormat.NATIVE) {
            return new NativeKafkaConnectDeserializationSchema(catalogTable, false, false, false, false);
        }
        if (!readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
            return TextDeserializationSchema.builder().seaTunnelRowType(seaTunnelRowType).delimiter("\b").setCatalogTable(catalogTable).build();
        }
        switch (format) {
            case JSON: {
                return new JsonDeserializationSchema(catalogTable, false, false);
            }
            case TEXT: {
                String delimiter = (String)readonlyConfig.get(KafkaSourceOptions.FIELD_DELIMITER);
                return TextDeserializationSchema.builder().seaTunnelRowType(seaTunnelRowType).delimiter(delimiter).build();
            }
            case CANAL_JSON: {
                return CanalJsonDeserializationSchema.builder(catalogTable).setIgnoreParseErrors(true).build();
            }
            case OGG_JSON: {
                return OggJsonDeserializationSchema.builder(catalogTable).setIgnoreParseErrors(true).build();
            }
            case MAXWELL_JSON: {
                return MaxWellJsonDeserializationSchema.builder(catalogTable).setIgnoreParseErrors(true).build();
            }
            case COMPATIBLE_KAFKA_CONNECT_JSON: {
                Boolean keySchemaEnable = (Boolean)readonlyConfig.get(KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED);
                Boolean valueSchemaEnable = (Boolean)readonlyConfig.get(KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED);
                return new CompatibleKafkaConnectDeserializationSchema(catalogTable, keySchemaEnable, (boolean)valueSchemaEnable, false, false);
            }
            case DEBEZIUM_JSON: {
                boolean includeSchema = (Boolean)readonlyConfig.get(KafkaSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA);
                TableIdentifierConfig tableFilter = (TableIdentifierConfig)readonlyConfig.get(KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER);
                if (tableFilter != null) {
                    TablePath tablePath = TablePath.of((String)(StringUtils.isNotEmpty((CharSequence)tableFilter.getDatabaseName()) ? tableFilter.getDatabaseName() : null), (String)(StringUtils.isNotEmpty((CharSequence)tableFilter.getSchemaName()) ? tableFilter.getSchemaName() : null), (String)(StringUtils.isNotEmpty((CharSequence)tableFilter.getTableName()) ? tableFilter.getTableName() : null));
                    Map<TablePath, DebeziumJsonDeserializationSchema> tableDeserializationMap = Collections.singletonMap(tablePath, new DebeziumJsonDeserializationSchema(catalogTable, true, includeSchema));
                    return new DebeziumJsonDeserializationSchemaDispatcher(tableDeserializationMap, true, includeSchema);
                }
                return new DebeziumJsonDeserializationSchema(catalogTable, true, includeSchema);
            }
            case AVRO: {
                return new AvroDeserializationSchema(catalogTable);
            }
            case PROTOBUF: {
                return new ProtobufDeserializationSchema(catalogTable);
            }
        }
        throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + (Object)((Object)format));
    }

    private TableSchema nativeTableSchema() {
        return TableSchema.builder().column((Column)PhysicalColumn.of((String)"headers", (SeaTunnelDataType)new MapType((SeaTunnelDataType)BasicType.STRING_TYPE, (SeaTunnelDataType)BasicType.STRING_TYPE), (Integer)0, (boolean)false, null, null)).column((Column)PhysicalColumn.of((String)"key", (SeaTunnelDataType)PrimitiveByteArrayType.INSTANCE, (Integer)0, (boolean)false, null, null)).column((Column)PhysicalColumn.of((String)"offset", (SeaTunnelDataType)BasicType.LONG_TYPE, (Integer)0, (boolean)false, null, null)).column((Column)PhysicalColumn.of((String)"partition", (SeaTunnelDataType)BasicType.INT_TYPE, (Integer)0, (boolean)false, null, null)).column((Column)PhysicalColumn.of((String)"timestamp", (SeaTunnelDataType)BasicType.LONG_TYPE, (Integer)0, (boolean)false, null, null)).column((Column)PhysicalColumn.of((String)"timestampType", (SeaTunnelDataType)BasicType.STRING_TYPE, (Integer)0, (boolean)false, null, null)).column((Column)PhysicalColumn.of((String)"value", (SeaTunnelDataType)PrimitiveByteArrayType.INSTANCE, (Integer)0, (boolean)false, null, null)).build();
    }

    public String getBootstrap() {
        return this.bootstrap;
    }

    public Map<TablePath, ConsumerMetadata> getMapMetadata() {
        return this.mapMetadata;
    }

    public boolean isCommitOnCheckpoint() {
        return this.commitOnCheckpoint;
    }

    public Properties getProperties() {
        return this.properties;
    }

    public long getDiscoveryIntervalMillis() {
        return this.discoveryIntervalMillis;
    }

    public MessageFormatErrorHandleWay getMessageFormatErrorHandleWay() {
        return this.messageFormatErrorHandleWay;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public long getPollTimeout() {
        return this.pollTimeout;
    }

    public int getReaderCacheQueueSize() {
        return this.readerCacheQueueSize;
    }
}

