/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.util.List;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.factories.FlinkFactoryUtil;
import org.apache.paimon.flink.kafka.KafkaLogOptions;
import org.apache.paimon.flink.kafka.KafkaLogSinkProvider;
import org.apache.paimon.flink.kafka.KafkaLogSourceProvider;
import org.apache.paimon.flink.kafka.KafkaLogStoreRegister;
import org.apache.paimon.flink.log.LogStoreRegister;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;

public class KafkaLogStoreFactory
implements LogStoreTableFactory {
    public static final String IDENTIFIER = "kafka";
    public static final String KAFKA_PREFIX = "kafka.";

    @Override
    public String identifier() {
        return IDENTIFIER;
    }

    private String topic(DynamicTableFactory.Context context) {
        return (String)context.getCatalogTable().getOptions().get(KafkaLogOptions.TOPIC.key());
    }

    @Override
    public KafkaLogSourceProvider createSourceProvider(DynamicTableFactory.Context context, DynamicTableSource.Context sourceContext, @Nullable int[][] projectFields) {
        FlinkFactoryUtil.FlinkTableFactoryHelper helper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context);
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        DataType physicalType = schema.toPhysicalRowDataType();
        DeserializationSchema primaryKeyDeserializer = null;
        int[] primaryKey = this.getPrimaryKeyIndexes(schema);
        if (primaryKey.length > 0) {
            DataType keyType = DataTypeUtils.projectRow((DataType)physicalType, (int[])primaryKey);
            primaryKeyDeserializer = (DeserializationSchema)LogStoreTableFactory.getKeyDecodingFormat(helper).createRuntimeDecoder(sourceContext, keyType);
        }
        DeserializationSchema valueDeserializer = (DeserializationSchema)LogStoreTableFactory.getValueDecodingFormat(helper).createRuntimeDecoder(sourceContext, physicalType);
        Options options = this.toOptions(helper.getOptions());
        return new KafkaLogSourceProvider(this.topic(context), KafkaLogStoreFactory.toKafkaProperties(options), physicalType, primaryKey, (DeserializationSchema<RowData>)primaryKeyDeserializer, (DeserializationSchema<RowData>)valueDeserializer, projectFields, options.get(CoreOptions.LOG_CONSISTENCY), CoreOptions.startupMode(options), options.get(CoreOptions.SCAN_TIMESTAMP_MILLIS));
    }

    @Override
    public KafkaLogSinkProvider createSinkProvider(DynamicTableFactory.Context context, DynamicTableSink.Context sinkContext) {
        FlinkFactoryUtil.FlinkTableFactoryHelper helper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context);
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        DataType physicalType = schema.toPhysicalRowDataType();
        SerializationSchema primaryKeySerializer = null;
        int[] primaryKey = this.getPrimaryKeyIndexes(schema);
        if (primaryKey.length > 0) {
            DataType keyType = DataTypeUtils.projectRow((DataType)physicalType, (int[])primaryKey);
            primaryKeySerializer = (SerializationSchema)LogStoreTableFactory.getKeyEncodingFormat(helper).createRuntimeEncoder(sinkContext, keyType);
        }
        SerializationSchema valueSerializer = (SerializationSchema)LogStoreTableFactory.getValueEncodingFormat(helper).createRuntimeEncoder(sinkContext, physicalType);
        Options options = this.toOptions(helper.getOptions());
        return new KafkaLogSinkProvider(this.topic(context), KafkaLogStoreFactory.toKafkaProperties(options), (SerializationSchema<RowData>)primaryKeySerializer, (SerializationSchema<RowData>)valueSerializer, options.get(CoreOptions.LOG_CONSISTENCY), options.get(CoreOptions.LOG_CHANGELOG_MODE));
    }

    @Override
    public LogStoreRegister createRegister(LogStoreTableFactory.RegisterContext context) {
        return new KafkaLogStoreRegister(context);
    }

    private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
        List columns = schema.getColumnNames();
        return schema.getPrimaryKey().map(UniqueConstraint::getColumns).map(pkColumns -> pkColumns.stream().mapToInt(columns::indexOf).toArray()).orElseGet(() -> new int[0]);
    }

    public static Properties toKafkaProperties(Options options) {
        Properties properties = new Properties();
        properties.putAll(OptionsUtils.convertToPropertiesPrefixKey(options.toMap(), KAFKA_PREFIX));
        if (options.get(CoreOptions.LOG_CONSISTENCY) == CoreOptions.LogConsistency.TRANSACTIONAL) {
            properties.setProperty("isolation.level", "read_committed");
        }
        return properties;
    }

    private Options toOptions(ReadableConfig config) {
        Options options = new Options();
        ((Configuration)config).toMap().forEach(options::setString);
        return options;
    }
}

