/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.scan;

import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerOffsetContext;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.SqlServerSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.scan.SnapshotSplitChangeEventSourceContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.scan.SqlServerSnapshotSplitReadTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlServerSnapshotFetchTask
implements FetchTask<SourceSplitBase> {
    private static final Logger log = LoggerFactory.getLogger(SqlServerSnapshotFetchTask.class);
    private final SnapshotSplit split;
    private volatile boolean taskRunning = false;
    private SqlServerSnapshotSplitReadTask snapshotSplitReadTask;

    public SqlServerSnapshotFetchTask(SnapshotSplit split) {
        this.split = split;
    }

    @Override
    public void execute(FetchTask.Context context) throws Exception {
        SqlServerSourceFetchTaskContext sourceFetchContext = (SqlServerSourceFetchTaskContext)context;
        this.taskRunning = true;
        this.snapshotSplitReadTask = new SqlServerSnapshotSplitReadTask(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getOffsetContext(), sourceFetchContext.getSnapshotChangeEventSourceMetrics(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getDataConnection(), sourceFetchContext.getDispatcher(), this.split);
        SnapshotSplitChangeEventSourceContext changeEventSourceContext = new SnapshotSplitChangeEventSourceContext();
        SnapshotResult snapshotResult = this.snapshotSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext)changeEventSourceContext, sourceFetchContext.getOffsetContext());
        IncrementalSplit backfillBinlogSplit = this.createBackFillLsnSplit(changeEventSourceContext);
        boolean binlogBackfillRequired = backfillBinlogSplit.getStopOffset().isAfter(backfillBinlogSplit.getStartupOffset());
        if (!binlogBackfillRequired) {
            this.dispatchLsnEndEvent(backfillBinlogSplit, ((SqlServerSourceFetchTaskContext)context).getOffsetContext().getPartition(), ((SqlServerSourceFetchTaskContext)context).getDispatcher());
            this.taskRunning = false;
            return;
        }
        if (!snapshotResult.isCompletedOrSkipped()) {
            this.taskRunning = false;
            throw new IllegalStateException(String.format("Read snapshot for SqlServer split %s fail", this.split));
        }
        SqlServerTransactionLogFetchTask.TransactionLogSplitReadTask backfillBinlogReadTask = this.createBackFillLsnSplitReadTask(backfillBinlogSplit, sourceFetchContext);
        OffsetContext sqlServerOffsetContext = new SqlServerOffsetContext.Loader(sourceFetchContext.getDbzConnectorConfig()).load((Map)backfillBinlogSplit.getStartupOffset().getOffset());
        log.info("start execute backfillBinlogReadTask, start offset : {}, stop offset : {}", (Object)backfillBinlogSplit.getStartupOffset(), (Object)backfillBinlogSplit.getStopOffset());
        backfillBinlogReadTask.execute((ChangeEventSource.ChangeEventSourceContext)new SnapshotBinlogSplitChangeEventSourceContext(), (SqlServerOffsetContext)sqlServerOffsetContext);
        log.info("backfillBinlogReadTask execute end");
    }

    private IncrementalSplit createBackFillLsnSplit(SnapshotSplitChangeEventSourceContext sourceContext) {
        return new IncrementalSplit(this.split.splitId(), Collections.singletonList(this.split.getTableId()), sourceContext.getLowWatermark(), sourceContext.getHighWatermark(), new ArrayList<CompletedSnapshotSplitInfo>());
    }

    private SqlServerTransactionLogFetchTask.TransactionLogSplitReadTask createBackFillLsnSplitReadTask(IncrementalSplit backfillBinlogSplit, SqlServerSourceFetchTaskContext context) {
        Configuration dezConf = ((Configuration.Builder)context.getSourceConfig().getDbzConfiguration().edit().with("table.include.list", this.split.getTableId().toString().substring(this.split.getTableId().toString().indexOf(".") + 1)).with(Heartbeat.HEARTBEAT_INTERVAL, 0)).build();
        return new SqlServerTransactionLogFetchTask.TransactionLogSplitReadTask(new SqlServerConnectorConfig(dezConf), SqlServerConnectionUtils.createSqlServerConnection(context.getSourceConfig().getDbzConfiguration()), context.getMetadataConnection(), context.getDispatcher(), context.getErrorHandler(), context.getDatabaseSchema(), backfillBinlogSplit);
    }

    private void dispatchLsnEndEvent(IncrementalSplit backFillBinlogSplit, Map<String, ?> sourcePartition, JdbcSourceEventDispatcher eventDispatcher) throws InterruptedException {
        eventDispatcher.dispatchWatermarkEvent(sourcePartition, backFillBinlogSplit, backFillBinlogSplit.getStopOffset(), WatermarkKind.END);
    }

    @Override
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override
    public void shutdown() {
        this.taskRunning = false;
    }

    @Override
    public SourceSplitBase getSplit() {
        return this.split;
    }

    public class SnapshotBinlogSplitChangeEventSourceContext
    implements ChangeEventSource.ChangeEventSourceContext {
        public void finished() {
            SqlServerSnapshotFetchTask.this.taskRunning = false;
        }

        @Override
        public boolean isRunning() {
            return SqlServerSnapshotFetchTask.this.taskRunning;
        }
    }
}

