/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory;
import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.options.Options;

public abstract class SynchronizationActionBase
extends ActionBase {
    private static final long DEFAULT_CHECKPOINT_INTERVAL = 180000L;
    protected final String database;
    protected final Configuration cdcSourceConfig;
    protected final SyncJobHandler syncJobHandler;
    protected final boolean caseSensitive;
    protected Map<String, String> tableConfig = new HashMap<String, String>();
    protected TypeMapping typeMapping = TypeMapping.defaultMapping();
    protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[0];

    public SynchronizationActionBase(String warehouse, String database, Map<String, String> catalogConfig, Map<String, String> cdcSourceConfig, SyncJobHandler syncJobHandler) {
        super(warehouse, catalogConfig);
        this.database = database;
        this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
        this.syncJobHandler = syncJobHandler;
        this.caseSensitive = this.catalog.caseSensitive();
        this.syncJobHandler.registerJdbcDriver();
    }

    public SynchronizationActionBase withTableConfig(Map<String, String> tableConfig) {
        this.tableConfig = tableConfig;
        return this;
    }

    public SynchronizationActionBase withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    public SynchronizationActionBase withMetadataColumns(List<String> metadataColumns) {
        this.metadataConverters = (CdcMetadataConverter[])metadataColumns.stream().map(this.syncJobHandler::provideMetadataConverter).toArray(CdcMetadataConverter[]::new);
        return this;
    }

    @VisibleForTesting
    public Map<String, String> tableConfig() {
        return this.tableConfig;
    }

    @Override
    public void build() throws Exception {
        this.syncJobHandler.checkRequiredOption();
        this.catalog.createDatabase(this.database, true);
        this.validateCaseSensitivity();
        this.beforeBuildingSourceSink();
        SingleOutputStreamOperator input = this.buildDataStreamSource(this.buildSource()).flatMap(this.recordParse()).name("Parse");
        EventParser.Factory<RichCdcMultiplexRecord> parserFactory = this.buildEventParserFactory();
        this.buildSink((DataStream<RichCdcMultiplexRecord>)input, parserFactory);
    }

    protected abstract void validateCaseSensitivity();

    protected void beforeBuildingSourceSink() throws Exception {
    }

    protected Object buildSource() {
        return this.syncJobHandler.provideSource();
    }

    private DataStreamSource<String> buildDataStreamSource(Object source) {
        if (source instanceof Source) {
            WatermarkStrategy watermarkStrategy;
            boolean isAutomaticWatermarkCreationEnabled = this.tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key()) && Objects.equals(this.tableConfig.get(CoreOptions.TAG_AUTOMATIC_CREATION.key()), CoreOptions.TagCreationMode.WATERMARK.toString());
            Options options = Options.fromMap(this.tableConfig);
            Duration idleTimeout = options.get(FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT);
            String watermarkAlignGroup = options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP);
            Object object = isAutomaticWatermarkCreationEnabled ? (watermarkAlignGroup != null ? new CdcWatermarkStrategy(CdcTimestampExtractorFactory.createExtractor(source)).withWatermarkAlignment(watermarkAlignGroup, options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT), options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL)) : new CdcWatermarkStrategy(CdcTimestampExtractorFactory.createExtractor(source))) : (watermarkStrategy = WatermarkStrategy.noWatermarks());
            if (idleTimeout != null) {
                watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
            }
            return this.env.fromSource((Source)source, watermarkStrategy, this.syncJobHandler.provideSourceName());
        }
        if (source instanceof SourceFunction) {
            return this.env.addSource((SourceFunction)source, this.syncJobHandler.provideSourceName());
        }
        throw new UnsupportedOperationException("Unrecognized source type");
    }

    protected abstract FlatMapFunction<String, RichCdcMultiplexRecord> recordParse();

    protected abstract EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory();

    protected abstract void buildSink(DataStream<RichCdcMultiplexRecord> var1, EventParser.Factory<RichCdcMultiplexRecord> var2);

    @Override
    public void run() throws Exception {
        this.build();
        if (!this.env.getCheckpointConfig().isCheckpointingEnabled()) {
            this.env.enableCheckpointing(180000L);
        }
        this.execute(this.syncJobHandler.provideDefaultJobName());
    }
}

