/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SyncTableActionBase
extends ActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(SyncTableActionBase.class);
    protected final String database;
    protected final String table;
    protected final Configuration cdcSourceConfig;
    protected FileStoreTable fileStoreTable;
    protected List<String> partitionKeys = new ArrayList<String>();
    protected List<String> primaryKeys = new ArrayList<String>();
    protected Map<String, String> tableConfig = new HashMap<String, String>();
    protected List<String> computedColumnArgs = new ArrayList<String>();
    protected TypeMapping typeMapping = TypeMapping.defaultMapping();
    protected List<ComputedColumn> computedColumns = new ArrayList<ComputedColumn>();
    protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[0];

    public SyncTableActionBase(String warehouse, String database, String table, Map<String, String> catalogConfig, Map<String, String> cdcSourceConfig) {
        super(warehouse, catalogConfig);
        this.database = database;
        this.table = table;
        this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
    }

    public SyncTableActionBase withPartitionKeys(String ... partitionKeys) {
        return this.withPartitionKeys(Arrays.asList(partitionKeys));
    }

    public SyncTableActionBase withPartitionKeys(List<String> partitionKeys) {
        this.partitionKeys = partitionKeys;
        return this;
    }

    public SyncTableActionBase withPrimaryKeys(String ... primaryKeys) {
        return this.withPrimaryKeys(Arrays.asList(primaryKeys));
    }

    public SyncTableActionBase withPrimaryKeys(List<String> primaryKeys) {
        this.primaryKeys = primaryKeys;
        return this;
    }

    public SyncTableActionBase withTableConfig(Map<String, String> tableConfig) {
        this.tableConfig = tableConfig;
        return this;
    }

    public SyncTableActionBase withComputedColumnArgs(List<String> computedColumnArgs) {
        this.computedColumnArgs = computedColumnArgs;
        return this;
    }

    public SyncTableActionBase withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    public SyncTableActionBase withMetadataColumns(List<String> metadataColumns) {
        this.metadataConverters = (CdcMetadataConverter[])metadataColumns.stream().map(this::metadataConverter).filter(Optional::isPresent).map(Optional::get).toArray(CdcMetadataConverter[]::new);
        return this;
    }

    protected Optional<CdcMetadataConverter<?>> metadataConverter(String column) {
        return Optional.empty();
    }

    protected void checkCdcSourceArgument() {
    }

    protected abstract Schema retrieveSchema() throws Exception;

    protected Schema buildPaimonSchema(Schema retrievedSchema) {
        return CdcActionCommonUtils.buildPaimonSchema(this.partitionKeys, this.primaryKeys, this.computedColumns, this.tableConfig, retrievedSchema, this.metadataConverters, true);
    }

    protected abstract DataStreamSource<String> buildSource() throws Exception;

    protected abstract String sourceName();

    protected abstract FlatMapFunction<String, RichCdcMultiplexRecord> recordParse();

    @Override
    public void build() throws Exception {
        Schema paimonSchema;
        Schema retrievedSchema;
        this.checkCdcSourceArgument();
        this.catalog.createDatabase(this.database, true);
        boolean caseSensitive = this.catalog.caseSensitive();
        this.validateCaseInsensitive(caseSensitive);
        Identifier identifier = new Identifier(this.database, this.table);
        if (this.catalog.tableExists(identifier)) {
            this.fileStoreTable = (FileStoreTable)this.catalog.getTable(identifier).copy(this.tableConfig);
            try {
                retrievedSchema = this.retrieveSchema();
                this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, retrievedSchema.fields());
                paimonSchema = this.buildPaimonSchema(retrievedSchema);
                CdcActionCommonUtils.assertSchemaCompatible(this.fileStoreTable.schema(), paimonSchema.fields());
            }
            catch (SchemaRetrievalException e) {
                LOG.info("Failed to retrieve schema from record data but there exists specified Paimon table. Schema compatibility check will be skipped. If you have specified computed columns, here will use the existed Paimon table schema to build them. Please make sure the Paimon table has defined all the argument columns used for computed columns.");
                this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, this.fileStoreTable.schema().fields());
                this.checkConstraints();
            }
        } else {
            retrievedSchema = this.retrieveSchema();
            this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, retrievedSchema.fields());
            paimonSchema = this.buildPaimonSchema(retrievedSchema);
            this.catalog.createTable(identifier, paimonSchema, false);
            this.fileStoreTable = (FileStoreTable)this.catalog.getTable(identifier).copy(this.tableConfig);
        }
        this.checkComputedColumns(this.computedColumns);
        SingleOutputStreamOperator input = this.buildSource().flatMap(this.recordParse()).name("Parse");
        EventParser.Factory parserFactory = () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
        CdcSinkBuilder sinkBuilder = new CdcSinkBuilder().withInput(input).withParserFactory(parserFactory).withTable(this.fileStoreTable).withIdentifier(identifier).withCatalogLoader(this.catalogLoader());
        String sinkParallelism = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (sinkParallelism != null) {
            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
        }
        sinkBuilder.build();
    }

    protected DataStreamSource<String> buildDataStreamSource(Object source) {
        if (source instanceof Source) {
            return this.env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), this.sourceName());
        }
        if (source instanceof SourceFunction) {
            return this.env.addSource((SourceFunction)source, this.sourceName());
        }
        throw new UnsupportedOperationException("Unrecognized source type");
    }

    protected void validateCaseInsensitive(boolean caseSensitive) {
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", this.database);
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table", this.table);
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Partition keys", this.partitionKeys);
        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Primary keys", this.primaryKeys);
    }

    protected void checkComputedColumns(List<ComputedColumn> computedColumns) {
        if (!computedColumns.isEmpty()) {
            List computedFields = computedColumns.stream().map(ComputedColumn::columnName).collect(Collectors.toList());
            List<String> fieldNames = this.fileStoreTable.schema().fieldNames();
            Preconditions.checkArgument(new HashSet<String>(fieldNames).containsAll(computedFields), " Exists Table should contain all computed columns %s, but are %s.", computedFields, fieldNames);
        }
    }

    private void checkConstraints() {
        if (!this.partitionKeys.isEmpty()) {
            List<String> actualPartitionKeys = this.fileStoreTable.partitionKeys();
            Preconditions.checkState(actualPartitionKeys.size() == this.partitionKeys.size() && actualPartitionKeys.containsAll(this.partitionKeys), "Specified partition keys [%s] are not equal to the existed table partition keys [%s]. You should remove the --partition-keys argument or re-create the table if the partition keys are wrong.", String.join((CharSequence)",", this.partitionKeys), String.join((CharSequence)",", actualPartitionKeys));
        }
        if (!this.primaryKeys.isEmpty()) {
            List<String> actualPrimaryKeys = this.fileStoreTable.primaryKeys();
            Preconditions.checkState(actualPrimaryKeys.size() == this.primaryKeys.size() && actualPrimaryKeys.containsAll(this.primaryKeys), "Specified primary keys [%s] are not equal to the existed table primary keys [%s]. You should remove the --primary-keys argument or re-create the table if the primary keys are wrong.", String.join((CharSequence)",", this.primaryKeys), String.join((CharSequence)",", actualPrimaryKeys));
        }
    }

    @VisibleForTesting
    public Map<String, String> tableConfig() {
        return this.tableConfig;
    }

    @VisibleForTesting
    public FileStoreTable fileStoreTable() {
        return this.fileStoreTable;
    }

    protected abstract String jobName();

    @Override
    public void run() throws Exception {
        this.build();
        this.execute(this.jobName());
    }

    public static class SchemaRetrievalException
    extends Exception {
        public SchemaRetrievalException(String message) {
            super(message);
        }
    }
}

