/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcMetadataProcessor;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils;
import org.apache.paimon.flink.action.cdc.mongodb.MongoDBRecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils;
import org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser;
import org.apache.paimon.flink.action.cdc.postgres.PostgresActionUtils;
import org.apache.paimon.flink.action.cdc.postgres.PostgresRecordParser;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.utils.Preconditions;

public class SyncJobHandler {
    private final SourceType sourceType;
    private final Configuration cdcSourceConfig;
    private final boolean isTableSync;
    private final String sinkLocation;

    public SyncJobHandler(SourceType sourceType, Map<String, String> cdcSourceConfig, String database) {
        this.sourceType = sourceType;
        this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
        this.isTableSync = false;
        this.sinkLocation = database;
    }

    public SyncJobHandler(SourceType sourceType, Map<String, String> cdcSourceConfig, String database, String table) {
        this.sourceType = sourceType;
        this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
        this.isTableSync = true;
        this.sinkLocation = database + "." + table;
    }

    public String provideSourceName() {
        return this.sourceType.sourceName;
    }

    public String provideDefaultJobName() {
        return String.format(this.sourceType.defaultJobNameFormat, this.isTableSync ? "Table" : "Database", this.sinkLocation);
    }

    public void registerJdbcDriver() {
        if (this.sourceType == SourceType.MYSQL) {
            MySqlActionUtils.registerJdbcDriver();
        } else if (this.sourceType == SourceType.POSTGRES) {
            PostgresActionUtils.registerJdbcDriver();
        }
    }

    public void checkRequiredOption() {
        switch (this.sourceType) {
            case MYSQL: {
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, "mysql_conf", MySqlSourceOptions.HOSTNAME, MySqlSourceOptions.USERNAME, MySqlSourceOptions.PASSWORD, MySqlSourceOptions.DATABASE_NAME);
                if (this.isTableSync) {
                    CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, "mysql_conf", MySqlSourceOptions.TABLE_NAME);
                    break;
                }
                Preconditions.checkArgument((!this.cdcSourceConfig.contains(MySqlSourceOptions.TABLE_NAME) ? 1 : 0) != 0, (Object)(MySqlSourceOptions.TABLE_NAME.key() + " cannot be set for mysql_sync_database. If you want to sync several MySQL tables into one Paimon table, use mysql_sync_table instead."));
                break;
            }
            case POSTGRES: {
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, "postgres_conf", PostgresSourceOptions.HOSTNAME, PostgresSourceOptions.USERNAME, PostgresSourceOptions.PASSWORD, PostgresSourceOptions.DATABASE_NAME, PostgresSourceOptions.SCHEMA_NAME, PostgresSourceOptions.SLOT_NAME);
                if (this.isTableSync) {
                    CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, "postgres_conf", PostgresSourceOptions.TABLE_NAME);
                    break;
                }
                Preconditions.checkArgument((!this.cdcSourceConfig.contains(PostgresSourceOptions.TABLE_NAME) ? 1 : 0) != 0, (Object)(PostgresSourceOptions.TABLE_NAME.key() + " cannot be set for postgres_sync_database. If you want to sync several PostgreSQL tables into one Paimon table, use postgres_sync_table instead."));
                break;
            }
            case KAFKA: {
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, "kafka_conf", KafkaConnectorOptions.VALUE_FORMAT, KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS);
                CdcActionCommonUtils.checkOneRequiredOption(this.cdcSourceConfig, "kafka_conf", KafkaConnectorOptions.TOPIC, KafkaConnectorOptions.TOPIC_PATTERN);
                break;
            }
            case PULSAR: {
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, "pulsar_conf", PulsarActionUtils.VALUE_FORMAT, PulsarOptions.PULSAR_SERVICE_URL, PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME);
                CdcActionCommonUtils.checkOneRequiredOption(this.cdcSourceConfig, "pulsar_conf", PulsarActionUtils.TOPIC, PulsarActionUtils.TOPIC_PATTERN);
                break;
            }
            case MONGODB: {
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, "mongodb_conf", MongoDBSourceOptions.HOSTS, MongoDBSourceOptions.DATABASE);
                if (!this.isTableSync) break;
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, "mongodb_conf", MongoDBSourceOptions.COLLECTION);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown source type " + (Object)((Object)this.sourceType));
            }
        }
    }

    public Source<CdcSourceRecord, ?, ?> provideSource() {
        switch (this.sourceType) {
            case KAFKA: {
                return KafkaActionUtils.buildKafkaSource(this.cdcSourceConfig, this.provideDataFormat().createKafkaDeserializer(this.cdcSourceConfig));
            }
            case PULSAR: {
                return PulsarActionUtils.buildPulsarSource(this.cdcSourceConfig, this.provideDataFormat().createPulsarDeserializer(this.cdcSourceConfig));
            }
        }
        throw new UnsupportedOperationException("Cannot get source from source type" + (Object)((Object)this.sourceType));
    }

    public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordParser(List<ComputedColumn> computedColumns, TypeMapping typeMapping, CdcMetadataConverter[] metadataConverters) {
        switch (this.sourceType) {
            case MYSQL: {
                return new MySqlRecordParser(this.cdcSourceConfig, computedColumns, typeMapping, metadataConverters);
            }
            case POSTGRES: {
                return new PostgresRecordParser(this.cdcSourceConfig, computedColumns, typeMapping, metadataConverters);
            }
            case KAFKA: 
            case PULSAR: {
                DataFormat dataFormat = this.provideDataFormat();
                return dataFormat.createParser(typeMapping, computedColumns);
            }
            case MONGODB: {
                return new MongoDBRecordParser(computedColumns, this.cdcSourceConfig);
            }
        }
        throw new UnsupportedOperationException("Unknown source type " + (Object)((Object)this.sourceType));
    }

    public DataFormat provideDataFormat() {
        switch (this.sourceType) {
            case KAFKA: {
                return KafkaActionUtils.getDataFormat(this.cdcSourceConfig);
            }
            case PULSAR: {
                return PulsarActionUtils.getDataFormat(this.cdcSourceConfig);
            }
        }
        throw new UnsupportedOperationException("Cannot get DataFormat from source type" + (Object)((Object)this.sourceType));
    }

    public MessageQueueSchemaUtils.ConsumerWrapper provideConsumer() {
        switch (this.sourceType) {
            case KAFKA: {
                return KafkaActionUtils.getKafkaEarliestConsumer(this.cdcSourceConfig, this.provideDataFormat().createKafkaDeserializer(this.cdcSourceConfig));
            }
            case PULSAR: {
                return PulsarActionUtils.createPulsarConsumer(this.cdcSourceConfig, this.provideDataFormat().createPulsarDeserializer(this.cdcSourceConfig));
            }
        }
        throw new UnsupportedOperationException("Cannot get consumer from source type" + (Object)((Object)this.sourceType));
    }

    public CdcMetadataConverter provideMetadataConverter(String column) {
        return CdcMetadataProcessor.converter(this.sourceType, column);
    }

    public static enum SourceType {
        MYSQL("MySQL Source", "MySQL-Paimon %s Sync: %s"),
        KAFKA("Kafka Source", "Kafka-Paimon %s Sync: %s"),
        MONGODB("MongoDB Source", "MongoDB-Paimon %s Sync: %s"),
        PULSAR("Pulsar Source", "Pulsar-Paimon %s Sync: %s"),
        POSTGRES("Postgres Source", "Postgres-Paimon %s Sync: %s");

        private final String sourceName;
        private final String defaultJobNameFormat;

        private SourceType(String sourceName, String defaultJobNameFormat) {
            this.sourceName = sourceName;
            this.defaultJobNameFormat = defaultJobNameFormat;
        }
    }
}

