/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

public abstract class SynchronizationActionBase
extends ActionBase {
    private static final long DEFAULT_CHECKPOINT_INTERVAL = 180000L;
    protected final String database;
    protected final Configuration cdcSourceConfig;
    protected final SyncJobHandler syncJobHandler;
    protected final boolean caseSensitive;
    protected Map<String, String> tableConfig = new HashMap<String, String>();
    protected TypeMapping typeMapping = TypeMapping.defaultMapping();
    protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[0];

    public SynchronizationActionBase(String database, Map<String, String> catalogConfig, Map<String, String> cdcSourceConfig, SyncJobHandler syncJobHandler) {
        super(catalogConfig);
        this.database = database;
        this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
        this.syncJobHandler = syncJobHandler;
        this.caseSensitive = this.catalog.caseSensitive();
        this.syncJobHandler.registerJdbcDriver();
    }

    public SynchronizationActionBase withTableConfig(Map<String, String> tableConfig) {
        this.tableConfig = tableConfig;
        return this;
    }

    public SynchronizationActionBase withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    public SynchronizationActionBase withMetadataColumns(List<String> metadataColumns) {
        this.metadataConverters = (CdcMetadataConverter[])metadataColumns.stream().map(this.syncJobHandler::provideMetadataConverter).toArray(CdcMetadataConverter[]::new);
        return this;
    }

    @VisibleForTesting
    public Map<String, String> tableConfig() {
        return this.tableConfig;
    }

    public void build() throws Exception {
        this.syncJobHandler.checkRequiredOption();
        this.catalog.createDatabase(this.database, true);
        this.beforeBuildingSourceSink();
        SingleOutputStreamOperator input = this.buildDataStreamSource(this.buildSource()).flatMap(this.recordParse()).name("Parse");
        EventParser.Factory<RichCdcMultiplexRecord> parserFactory = this.buildEventParserFactory();
        this.buildSink((DataStream<RichCdcMultiplexRecord>)input, parserFactory);
    }

    protected void beforeBuildingSourceSink() throws Exception {
    }

    protected Source<CdcSourceRecord, ?, ?> buildSource() {
        return this.syncJobHandler.provideSource();
    }

    protected CdcTimestampExtractor createCdcTimestampExtractor() {
        throw new IllegalArgumentException("Unsupported timestamp extractor for current cdc source.");
    }

    protected void validateRuntimeExecutionMode() {
        Preconditions.checkArgument((this.env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING ? 1 : 0) != 0, (Object)"It's only support STREAMING mode for flink-cdc sync table action.");
    }

    private DataStreamSource<CdcSourceRecord> buildDataStreamSource(Source<CdcSourceRecord, ?, ?> source) {
        WatermarkStrategy watermarkStrategy;
        boolean isAutomaticWatermarkCreationEnabled = this.tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key()) && Objects.equals(this.tableConfig.get(CoreOptions.TAG_AUTOMATIC_CREATION.key()), CoreOptions.TagCreationMode.WATERMARK.toString());
        Options options = Options.fromMap(this.tableConfig);
        Duration idleTimeout = (Duration)options.get(FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT);
        String watermarkAlignGroup = (String)options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP);
        Object object = isAutomaticWatermarkCreationEnabled ? (watermarkAlignGroup != null ? new CdcWatermarkStrategy(this.createCdcTimestampExtractor()).withWatermarkAlignment(watermarkAlignGroup, (Duration)options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT), (Duration)options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL)) : new CdcWatermarkStrategy(this.createCdcTimestampExtractor())) : (watermarkStrategy = WatermarkStrategy.noWatermarks());
        if (idleTimeout != null) {
            watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
        }
        return this.env.fromSource(source, watermarkStrategy, this.syncJobHandler.provideSourceName());
    }

    protected abstract FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse();

    protected abstract EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory();

    protected abstract void buildSink(DataStream<RichCdcMultiplexRecord> var1, EventParser.Factory<RichCdcMultiplexRecord> var2);

    protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) {
        HashMap<String, String> dynamicOptions = new HashMap<String, String>(this.tableConfig);
        dynamicOptions.remove(CoreOptions.BUCKET.key());
        Map oldOptions = table.options();
        Set immutableOptionKeys = CoreOptions.IMMUTABLE_OPTIONS;
        dynamicOptions.entrySet().removeIf(entry -> immutableOptionKeys.contains(entry.getKey()) || Objects.equals(oldOptions.get(entry.getKey()), entry.getValue()));
        if (dynamicOptions.isEmpty()) {
            return table;
        }
        List optionChanges = dynamicOptions.entrySet().stream().map(entry -> SchemaChange.setOption((String)((String)entry.getKey()), (String)((String)entry.getValue()))).collect(Collectors.toList());
        try {
            this.catalog.alterTable(identifier, optionChanges, false);
        }
        catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException | Catalog.TableNotExistException e) {
            throw new RuntimeException("This is unexpected.", e);
        }
        return table.copy(dynamicOptions);
    }

    public void run() throws Exception {
        if (!this.env.getCheckpointConfig().isCheckpointingEnabled()) {
            this.env.enableCheckpointing(180000L);
        }
        this.build();
        this.execute(this.syncJobHandler.provideDefaultJobName());
    }
}

