/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source;

import io.debezium.relational.TableId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

public abstract class IncrementalSource<T, C extends SourceConfig>
implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
    protected ReadonlyConfig readonlyConfig;
    protected SourceConfig.Factory<C> configFactory;
    protected OffsetFactory offsetFactory;
    protected DataSourceDialect<C> dataSourceDialect;
    protected StartupConfig startupConfig;
    protected int incrementalParallelism;
    protected StopConfig stopConfig;
    protected StopMode stopMode;
    protected DebeziumDeserializationSchema<T> deserializationSchema;

    public final void prepare(Config pluginConfig) throws PrepareFailException {
        this.readonlyConfig = ReadonlyConfig.fromConfig((Config)pluginConfig);
        this.startupConfig = this.getStartupConfig(this.readonlyConfig);
        this.stopConfig = this.getStopConfig(this.readonlyConfig);
        this.stopMode = this.stopConfig.getStopMode();
        this.incrementalParallelism = (Integer)this.readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM);
        this.configFactory = this.createSourceConfigFactory(this.readonlyConfig);
        this.dataSourceDialect = this.createDataSourceDialect(this.readonlyConfig);
        this.deserializationSchema = this.createDebeziumDeserializationSchema(this.readonlyConfig);
        this.offsetFactory = this.createOffsetFactory(this.readonlyConfig);
    }

    protected StartupConfig getStartupConfig(ReadonlyConfig config) {
        return new StartupConfig((StartupMode)((Object)config.get(SourceOptions.STARTUP_MODE)), (String)config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE), (Long)config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_POS), (Long)config.get(SourceOptions.STARTUP_TIMESTAMP));
    }

    private StopConfig getStopConfig(ReadonlyConfig config) {
        return new StopConfig((StopMode)((Object)config.get(SourceOptions.STOP_MODE)), (String)config.get(SourceOptions.STOP_SPECIFIC_OFFSET_FILE), (Long)config.get(SourceOptions.STOP_SPECIFIC_OFFSET_POS), (Long)config.get(SourceOptions.STOP_TIMESTAMP));
    }

    public abstract SourceConfig.Factory<C> createSourceConfigFactory(ReadonlyConfig var1);

    public abstract DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig var1);

    public abstract DataSourceDialect<C> createDataSourceDialect(ReadonlyConfig var1);

    public abstract OffsetFactory createOffsetFactory(ReadonlyConfig var1);

    public Boundedness getBoundedness() {
        return this.stopMode == StopMode.NEVER ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
    }

    public SeaTunnelDataType<T> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context readerContext) throws Exception {
        C sourceConfig = this.configFactory.create(readerContext.getIndexOfSubtask());
        LinkedBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue = new LinkedBlockingQueue<RecordsWithSplitIds<SourceRecords>>(1024);
        Supplier splitReaderSupplier = () -> new IncrementalSourceSplitReader<SourceConfig>(readerContext.getIndexOfSubtask(), (DataSourceDialect<SourceConfig>)this.dataSourceDialect, (SourceConfig)sourceConfig);
        return new IncrementalSourceReader(elementsQueue, splitReaderSupplier, this.createRecordEmitter((SourceConfig)sourceConfig), new SourceReaderOptions(this.readonlyConfig), readerContext, sourceConfig);
    }

    protected RecordEmitter<SourceRecords, T, SourceSplitStateBase> createRecordEmitter(SourceConfig sourceConfig) {
        return new IncrementalSourceRecordEmitter<T>(this.deserializationSchema, this.offsetFactory);
    }

    public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(SourceSplitEnumerator.Context<SourceSplitBase> enumeratorContext) throws Exception {
        SplitAssigner splitAssigner;
        C sourceConfig = this.configFactory.create(0);
        List<TableId> remainingTables = this.dataSourceDialect.discoverDataCollections(sourceConfig);
        SplitAssigner.Context<C> assignerContext = new SplitAssigner.Context<C>(sourceConfig, new HashSet<TableId>(remainingTables), new HashMap<String, SnapshotSplit>(), new HashMap<String, Offset>());
        if (sourceConfig.getStartupConfig().getStartupMode() == StartupMode.INITIAL) {
            try {
                boolean isTableIdCaseSensitive = this.dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
                splitAssigner = new HybridSplitAssigner<C>(assignerContext, enumeratorContext.currentParallelism(), this.incrementalParallelism, remainingTables, isTableIdCaseSensitive, this.dataSourceDialect, this.offsetFactory);
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to discover captured tables for enumerator", e);
            }
        } else {
            splitAssigner = new IncrementalSplitAssigner<C>(assignerContext, this.incrementalParallelism, this.offsetFactory);
        }
        return new IncrementalSourceEnumerator(enumeratorContext, splitAssigner);
    }

    public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumerator(SourceSplitEnumerator.Context<SourceSplitBase> enumeratorContext, PendingSplitsState checkpointState) throws Exception {
        SplitAssigner splitAssigner;
        C sourceConfig = this.configFactory.create(0);
        List<TableId> remainingTables = this.dataSourceDialect.discoverDataCollections(sourceConfig);
        SplitAssigner.Context<C> assignerContext = new SplitAssigner.Context<C>(sourceConfig, new HashSet<TableId>(remainingTables), new HashMap<String, SnapshotSplit>(), new HashMap<String, Offset>());
        if (checkpointState instanceof HybridPendingSplitsState) {
            splitAssigner = new HybridSplitAssigner<C>(assignerContext, enumeratorContext.currentParallelism(), this.incrementalParallelism, (HybridPendingSplitsState)checkpointState, this.dataSourceDialect, this.offsetFactory);
        } else if (checkpointState instanceof IncrementalPhaseState) {
            splitAssigner = new IncrementalSplitAssigner<C>(assignerContext, this.incrementalParallelism, this.offsetFactory);
        } else {
            throw new UnsupportedOperationException("Unsupported restored PendingSplitsState: " + checkpointState);
        }
        return new IncrementalSourceEnumerator(enumeratorContext, splitAssigner);
    }
}

