/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJsonFormatOptions;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.json.maxwell.MaxWellJsonDeserializationSchema;
import org.apache.seatunnel.format.json.ogg.OggJsonDeserializationSchema;
import org.apache.seatunnel.format.protobuf.ProtobufDeserializationSchema;
import org.apache.seatunnel.format.text.TextDeserializationSchema;

public class KafkaSourceConfig
implements Serializable {
    private static final long serialVersionUID = 1L;
    private final String bootstrap;
    private final Map<TablePath, ConsumerMetadata> mapMetadata;
    private final boolean commitOnCheckpoint;
    private final Properties properties;
    private final long discoveryIntervalMillis;
    private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
    private final String consumerGroup;
    private final long pollTimeout;

    public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
        this.bootstrap = (String)readonlyConfig.get(Config.BOOTSTRAP_SERVERS);
        this.mapMetadata = this.createMapConsumerMetadata(readonlyConfig);
        this.commitOnCheckpoint = (Boolean)readonlyConfig.get(Config.COMMIT_ON_CHECKPOINT);
        this.properties = this.createKafkaProperties(readonlyConfig);
        this.discoveryIntervalMillis = (Long)readonlyConfig.get(Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
        this.messageFormatErrorHandleWay = (MessageFormatErrorHandleWay)((Object)readonlyConfig.get(Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION));
        this.pollTimeout = (Long)readonlyConfig.get(Config.KEY_POLL_TIMEOUT);
        this.consumerGroup = (String)readonlyConfig.get(Config.CONSUMER_GROUP);
    }

    private Properties createKafkaProperties(ReadonlyConfig readonlyConfig) {
        Properties resultProperties = new Properties();
        readonlyConfig.getOptional(Config.KAFKA_CONFIG).ifPresent(resultProperties::putAll);
        return resultProperties;
    }

    private Map<TablePath, ConsumerMetadata> createMapConsumerMetadata(ReadonlyConfig readonlyConfig) {
        List<ConsumerMetadata> consumerMetadataList = readonlyConfig.getOptional(Config.TABLE_LIST).isPresent() ? ((List)readonlyConfig.get(Config.TABLE_LIST)).stream().map(ReadonlyConfig::fromMap).map(config -> this.createConsumerMetadata((ReadonlyConfig)config)).collect(Collectors.toList()) : Collections.singletonList(this.createConsumerMetadata(readonlyConfig));
        return consumerMetadataList.stream().collect(Collectors.toMap(consumerMetadata -> TablePath.of((String)consumerMetadata.getTopic()), consumerMetadata -> consumerMetadata));
    }

    private ConsumerMetadata createConsumerMetadata(ReadonlyConfig readonlyConfig) {
        ConsumerMetadata consumerMetadata = new ConsumerMetadata();
        consumerMetadata.setTopic((String)readonlyConfig.get(Config.TOPIC));
        consumerMetadata.setPattern((Boolean)readonlyConfig.get(Config.PATTERN));
        consumerMetadata.setProperties(new Properties());
        CatalogTable catalogTable = this.createCatalogTable(readonlyConfig);
        consumerMetadata.setCatalogTable(catalogTable);
        consumerMetadata.setDeserializationSchema(this.createDeserializationSchema(catalogTable, readonlyConfig));
        readonlyConfig.getOptional(Config.START_MODE).ifPresent(startMode -> {
            consumerMetadata.setStartMode((StartMode)((Object)startMode));
            switch (startMode) {
                case TIMESTAMP: {
                    long startOffsetsTimestamp = (Long)readonlyConfig.get(Config.START_MODE_TIMESTAMP);
                    long currentTimestamp = System.currentTimeMillis();
                    if (startOffsetsTimestamp < 0L || startOffsetsTimestamp > currentTimestamp) {
                        throw new IllegalArgumentException("start_mode.timestamp The value is smaller than 0 or smaller than the current time");
                    }
                    consumerMetadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
                    break;
                }
                case SPECIFIC_OFFSETS: {
                    Map offsetMap = (Map)readonlyConfig.get(Config.START_MODE_OFFSETS);
                    if (MapUtils.isEmpty((Map)offsetMap)) {
                        throw new IllegalArgumentException("start mode is " + (Object)((Object)StartMode.SPECIFIC_OFFSETS) + "but no specific offsets were specified.");
                    }
                    HashMap<TopicPartition, Long> specificStartOffsets = new HashMap<TopicPartition, Long>();
                    offsetMap.forEach((topicPartitionKey, offset) -> {
                        int splitIndex = topicPartitionKey.lastIndexOf("-");
                        String topic = topicPartitionKey.substring(0, splitIndex);
                        String partition = topicPartitionKey.substring(splitIndex + 1);
                        TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partition));
                        specificStartOffsets.put(topicPartition, (Long)offset);
                    });
                    consumerMetadata.setSpecificStartOffsets(specificStartOffsets);
                    break;
                }
            }
        });
        return consumerMetadata;
    }

    private CatalogTable createCatalogTable(final ReadonlyConfig readonlyConfig) {
        Optional schemaOptions = readonlyConfig.getOptional(TableSchemaOptions.SCHEMA);
        TablePath tablePath = TablePath.of((String)((String)readonlyConfig.get(Config.TOPIC)));
        TableSchema tableSchema = schemaOptions.isPresent() ? new ReadonlyConfigParser().parse(readonlyConfig) : TableSchema.builder().column((Column)PhysicalColumn.of((String)"content", (SeaTunnelDataType)new SeaTunnelRowType(new String[]{"content"}, new SeaTunnelDataType[]{BasicType.STRING_TYPE}), (Integer)0, (boolean)false, null, null)).build();
        return CatalogTable.of((TableIdentifier)TableIdentifier.of((String)"", (TablePath)tablePath), (TableSchema)tableSchema, (Map)new HashMap<String, String>(){
            {
                Optional.ofNullable((String)readonlyConfig.get(Config.PROTOBUF_MESSAGE_NAME)).ifPresent(value -> this.put(Config.PROTOBUF_MESSAGE_NAME.key(), value));
                Optional.ofNullable((String)readonlyConfig.get(Config.PROTOBUF_SCHEMA)).ifPresent(value -> this.put(Config.PROTOBUF_SCHEMA.key(), value));
            }
        }, Collections.emptyList(), null);
    }

    private DeserializationSchema<SeaTunnelRow> createDeserializationSchema(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
        SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
        MessageFormat format = (MessageFormat)((Object)readonlyConfig.get(Config.FORMAT));
        if (!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
            return TextDeserializationSchema.builder().seaTunnelRowType(seaTunnelRowType).delimiter("\b").setCatalogTable(catalogTable).build();
        }
        switch (format) {
            case JSON: {
                return new JsonDeserializationSchema(catalogTable, false, false);
            }
            case TEXT: {
                String delimiter = (String)readonlyConfig.get(Config.FIELD_DELIMITER);
                return TextDeserializationSchema.builder().seaTunnelRowType(seaTunnelRowType).delimiter(delimiter).build();
            }
            case CANAL_JSON: {
                return CanalJsonDeserializationSchema.builder(catalogTable).setIgnoreParseErrors(true).build();
            }
            case OGG_JSON: {
                return OggJsonDeserializationSchema.builder(catalogTable).setIgnoreParseErrors(true).build();
            }
            case MAXWELL_JSON: {
                return MaxWellJsonDeserializationSchema.builder(catalogTable).setIgnoreParseErrors(true).build();
            }
            case COMPATIBLE_KAFKA_CONNECT_JSON: {
                Boolean keySchemaEnable = (Boolean)readonlyConfig.get(KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED);
                Boolean valueSchemaEnable = (Boolean)readonlyConfig.get(KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED);
                return new CompatibleKafkaConnectDeserializationSchema(catalogTable, keySchemaEnable, (boolean)valueSchemaEnable, false, false);
            }
            case DEBEZIUM_JSON: {
                boolean includeSchema = (Boolean)readonlyConfig.get(Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA);
                return new DebeziumJsonDeserializationSchema(catalogTable, true, includeSchema);
            }
            case AVRO: {
                return new AvroDeserializationSchema(catalogTable);
            }
            case PROTOBUF: {
                return new ProtobufDeserializationSchema(catalogTable);
            }
        }
        throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + (Object)((Object)format));
    }

    public String getBootstrap() {
        return this.bootstrap;
    }

    public Map<TablePath, ConsumerMetadata> getMapMetadata() {
        return this.mapMetadata;
    }

    public boolean isCommitOnCheckpoint() {
        return this.commitOnCheckpoint;
    }

    public Properties getProperties() {
        return this.properties;
    }

    public long getDiscoveryIntervalMillis() {
        return this.discoveryIntervalMillis;
    }

    public MessageFormatErrorHandleWay getMessageFormatErrorHandleWay() {
        return this.messageFormatErrorHandleWay;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public long getPollTimeout() {
        return this.pollTimeout;
    }
}

