/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.SynchronizationActionBase;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SyncTableActionBase
extends SynchronizationActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(SyncTableActionBase.class);
    protected final String table;
    protected FileStoreTable fileStoreTable;
    protected List<String> partitionKeys = new ArrayList<String>();
    protected List<String> primaryKeys = new ArrayList<String>();
    protected List<String> computedColumnArgs = new ArrayList<String>();
    protected List<ComputedColumn> computedColumns = new ArrayList<ComputedColumn>();

    public SyncTableActionBase(String database, String table, Map<String, String> catalogConfig, Map<String, String> cdcSourceConfig, SyncJobHandler.SourceType sourceType) {
        super(database, catalogConfig, cdcSourceConfig, new SyncJobHandler(sourceType, cdcSourceConfig, database, table));
        this.table = table;
    }

    public SyncTableActionBase withPartitionKeys(String ... partitionKeys) {
        return this.withPartitionKeys(Arrays.asList(partitionKeys));
    }

    public SyncTableActionBase withPartitionKeys(List<String> partitionKeys) {
        this.partitionKeys = partitionKeys;
        return this;
    }

    public SyncTableActionBase withPrimaryKeys(String ... primaryKeys) {
        return this.withPrimaryKeys(Arrays.asList(primaryKeys));
    }

    public SyncTableActionBase withPrimaryKeys(List<String> primaryKeys) {
        this.primaryKeys = primaryKeys;
        return this;
    }

    public SyncTableActionBase withComputedColumnArgs(List<String> computedColumnArgs) {
        this.computedColumnArgs = computedColumnArgs;
        return this;
    }

    protected abstract Schema retrieveSchema() throws Exception;

    protected Schema buildPaimonSchema(Schema retrievedSchema) {
        return CdcActionCommonUtils.buildPaimonSchema(this.table, this.partitionKeys, this.primaryKeys, this.computedColumns, this.tableConfig, retrievedSchema, this.metadataConverters, this.caseSensitive, true, true);
    }

    @Override
    protected void beforeBuildingSourceSink() throws Exception {
        Identifier identifier = new Identifier(this.database, this.table);
        try {
            this.fileStoreTable = (FileStoreTable)this.catalog.getTable(identifier);
            this.fileStoreTable = this.alterTableOptions(identifier, this.fileStoreTable);
            try {
                Schema retrievedSchema = this.retrieveSchema();
                this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, retrievedSchema.fields());
                Schema paimonSchema = this.buildPaimonSchema(retrievedSchema);
                CdcActionCommonUtils.assertSchemaCompatible(this.fileStoreTable.schema(), paimonSchema.fields());
            }
            catch (SchemaRetrievalException e) {
                LOG.info("Failed to retrieve schema from record data but there exists specified Paimon table. Schema compatibility check will be skipped. If you have specified computed columns, here will use the existed Paimon table schema to build them. Please make sure the Paimon table has defined all the argument columns used for computed columns.");
                this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, this.fileStoreTable.schema().fields(), this.caseSensitive);
                this.checkConstraints();
            }
        }
        catch (Catalog.TableNotExistException e) {
            Schema retrievedSchema = this.retrieveSchema();
            this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, retrievedSchema.fields());
            Schema paimonSchema = this.buildPaimonSchema(retrievedSchema);
            this.catalog.createTable(identifier, paimonSchema, false);
            this.fileStoreTable = (FileStoreTable)this.catalog.getTable(identifier);
        }
    }

    @Override
    protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
        return this.syncJobHandler.provideRecordParser(this.computedColumns, this.typeMapping, this.metadataConverters);
    }

    @Override
    protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory() {
        boolean caseSensitive = this.caseSensitive;
        return () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
    }

    @Override
    protected void buildSink(DataStream<RichCdcMultiplexRecord> input, EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
        CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder = new CdcSinkBuilder<RichCdcMultiplexRecord>().withInput(input).withParserFactory(parserFactory).withTable(this.fileStoreTable).withIdentifier(new Identifier(this.database, this.table)).withTypeMapping(this.typeMapping).withCatalogLoader(this.catalogLoader());
        String sinkParallelism = (String)this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (sinkParallelism != null) {
            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
        }
        sinkBuilder.build();
    }

    private void checkConstraints() {
        if (!this.partitionKeys.isEmpty()) {
            List<String> actualPartitionKeys = this.fileStoreTable.partitionKeys();
            Preconditions.checkState(actualPartitionKeys.size() == this.partitionKeys.size() && actualPartitionKeys.containsAll(this.partitionKeys), "Specified partition keys [%s] are not equal to the existed table partition keys [%s]. You should remove the --partition-keys argument or re-create the table if the partition keys are wrong.", String.join((CharSequence)",", this.partitionKeys), String.join((CharSequence)",", actualPartitionKeys));
        }
        if (!this.primaryKeys.isEmpty()) {
            List<String> actualPrimaryKeys = this.fileStoreTable.primaryKeys();
            Preconditions.checkState(actualPrimaryKeys.size() == this.primaryKeys.size() && actualPrimaryKeys.containsAll(this.primaryKeys), "Specified primary keys [%s] are not equal to the existed table primary keys [%s]. You should remove the --primary-keys argument or re-create the table if the primary keys are wrong.", String.join((CharSequence)",", this.primaryKeys), String.join((CharSequence)",", actualPrimaryKeys));
        }
    }

    @VisibleForTesting
    public FileStoreTable fileStoreTable() {
        return this.fileStoreTable;
    }

    public static class SchemaRetrievalException
    extends Exception {
        public SchemaRetrievalException(String message) {
            super(message);
        }
    }
}

