/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog.MySqlBinlogFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotSplitReadTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.SnapshotSplitChangeEventSourceContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;

public class MySqlSnapshotFetchTask
implements FetchTask<SourceSplitBase> {
    private final SnapshotSplit split;
    private volatile boolean taskRunning = false;
    private MySqlSnapshotSplitReadTask snapshotSplitReadTask;

    public MySqlSnapshotFetchTask(SnapshotSplit split) {
        this.split = split;
    }

    @Override
    public void execute(FetchTask.Context context) throws Exception {
        MySqlSourceFetchTaskContext sourceFetchContext = (MySqlSourceFetchTaskContext)context;
        this.taskRunning = true;
        this.snapshotSplitReadTask = new MySqlSnapshotSplitReadTask(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getOffsetContext(), sourceFetchContext.getSnapshotChangeEventSourceMetrics(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getConnection(), sourceFetchContext.getDispatcher(), this.split);
        SnapshotSplitChangeEventSourceContext changeEventSourceContext = new SnapshotSplitChangeEventSourceContext();
        SnapshotResult snapshotResult = this.snapshotSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext)changeEventSourceContext, sourceFetchContext.getOffsetContext());
        IncrementalSplit backfillBinlogSplit = this.createBackfillBinlogSplit(changeEventSourceContext);
        boolean binlogBackfillRequired = backfillBinlogSplit.getStopOffset().isAfter(backfillBinlogSplit.getStartupOffset());
        if (!binlogBackfillRequired) {
            this.dispatchBinlogEndEvent(backfillBinlogSplit, ((MySqlSourceFetchTaskContext)context).getOffsetContext().getPartition(), ((MySqlSourceFetchTaskContext)context).getDispatcher());
            this.taskRunning = false;
            return;
        }
        if (!snapshotResult.isCompletedOrSkipped()) {
            this.taskRunning = false;
            throw new IllegalStateException(String.format("Read snapshot for mysql split %s fail", this.split));
        }
        MySqlBinlogFetchTask.MySqlBinlogSplitReadTask backfillBinlogReadTask = this.createBackfillBinlogReadTask(backfillBinlogSplit, sourceFetchContext);
        backfillBinlogReadTask.execute((ChangeEventSource.ChangeEventSourceContext)new SnapshotBinlogSplitChangeEventSourceContext(), sourceFetchContext.getOffsetContext());
    }

    private IncrementalSplit createBackfillBinlogSplit(SnapshotSplitChangeEventSourceContext sourceContext) {
        return new IncrementalSplit(this.split.splitId(), Collections.singletonList(this.split.getTableId()), sourceContext.getLowWatermark(), sourceContext.getHighWatermark(), new ArrayList<CompletedSnapshotSplitInfo>());
    }

    private void dispatchBinlogEndEvent(IncrementalSplit backFillBinlogSplit, Map<String, ?> sourcePartition, JdbcSourceEventDispatcher eventDispatcher) throws InterruptedException {
        eventDispatcher.dispatchWatermarkEvent(sourcePartition, backFillBinlogSplit, backFillBinlogSplit.getStopOffset(), WatermarkKind.END);
    }

    private MySqlBinlogFetchTask.MySqlBinlogSplitReadTask createBackfillBinlogReadTask(IncrementalSplit backfillBinlogSplit, MySqlSourceFetchTaskContext context) {
        MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(context.getSourceConfig().getDbzConnectorConfig());
        OffsetContext mySqlOffsetContext = loader.load((Map)backfillBinlogSplit.getStartupOffset().getOffset());
        Configuration dezConf = ((Configuration.Builder)context.getSourceConfig().getDbzConfiguration().edit().with("table.include.list", this.split.getTableId().toString()).with(Heartbeat.HEARTBEAT_INTERVAL, 0)).build();
        return new MySqlBinlogFetchTask.MySqlBinlogSplitReadTask(new MySqlConnectorConfig(dezConf), (MySqlOffsetContext)mySqlOffsetContext, MySqlConnectionUtils.createMySqlConnection(context.getSourceConfig().getDbzConfiguration()), context.getDispatcher(), context.getErrorHandler(), context.getTaskContext(), context.getStreamingChangeEventSourceMetrics(), backfillBinlogSplit);
    }

    @Override
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override
    public void shutdown() {
        this.taskRunning = false;
    }

    @Override
    public SourceSplitBase getSplit() {
        return this.split;
    }

    public class SnapshotBinlogSplitChangeEventSourceContext
    implements ChangeEventSource.ChangeEventSourceContext {
        public void finished() {
            MySqlSnapshotFetchTask.this.taskRunning = false;
        }

        @Override
        public boolean isRunning() {
            return MySqlSnapshotFetchTask.this.taskRunning;
        }
    }
}

