/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog;

import com.github.shyiko.mysql.binlog.event.Event;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSource;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Clock;
import java.util.HashMap;
import java.util.Map;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotFetchTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlBinlogFetchTask
implements FetchTask<SourceSplitBase> {
    private final IncrementalSplit split;
    private volatile boolean taskRunning = false;

    public MySqlBinlogFetchTask(IncrementalSplit split) {
        this.split = split;
    }

    @Override
    public void execute(FetchTask.Context context) throws Exception {
        MySqlSourceFetchTaskContext sourceFetchContext = (MySqlSourceFetchTaskContext)context;
        this.taskRunning = true;
        MySqlStreamingChangeEventSource mySqlStreamingChangeEventSource = new MySqlStreamingChangeEventSource(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getConnection(), sourceFetchContext.getDispatcher(), sourceFetchContext.getErrorHandler(), Clock.SYSTEM, sourceFetchContext.getTaskContext(), sourceFetchContext.getStreamingChangeEventSourceMetrics());
        BinlogSplitChangeEventSourceContext changeEventSourceContext = new BinlogSplitChangeEventSourceContext();
        mySqlStreamingChangeEventSource.execute((ChangeEventSource.ChangeEventSourceContext)changeEventSourceContext, sourceFetchContext.getOffsetContext());
    }

    @Override
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override
    public void shutdown() {
        this.taskRunning = false;
    }

    @Override
    public SourceSplitBase getSplit() {
        return this.split;
    }

    private class BinlogSplitChangeEventSourceContext
    implements ChangeEventSource.ChangeEventSourceContext {
        private BinlogSplitChangeEventSourceContext() {
        }

        @Override
        public boolean isRunning() {
            return MySqlBinlogFetchTask.this.taskRunning;
        }
    }

    public static class MySqlBinlogSplitReadTask
    extends MySqlStreamingChangeEventSource {
        private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplitReadTask.class);
        private final IncrementalSplit binlogSplit;
        private final MySqlOffsetContext offsetContext;
        private final JdbcSourceEventDispatcher dispatcher;
        private final ErrorHandler errorHandler;
        private ChangeEventSource.ChangeEventSourceContext context;

        public MySqlBinlogSplitReadTask(MySqlConnectorConfig connectorConfig, MySqlOffsetContext offsetContext, MySqlConnection connection, JdbcSourceEventDispatcher dispatcher, ErrorHandler errorHandler, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics, IncrementalSplit binlogSplit) {
            super(connectorConfig, connection, dispatcher, errorHandler, Clock.SYSTEM, taskContext, metrics);
            this.binlogSplit = binlogSplit;
            this.dispatcher = dispatcher;
            this.offsetContext = offsetContext;
            this.errorHandler = errorHandler;
        }

        @Override
        public void execute(ChangeEventSource.ChangeEventSourceContext context, MySqlOffsetContext offsetContext) throws InterruptedException {
            this.context = context;
            super.execute(context, this.offsetContext);
        }

        @Override
        protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
            BinlogOffset currentBinlogOffset;
            super.handleEvent(offsetContext, event);
            if (this.isBoundedRead() && (currentBinlogOffset = MySqlBinlogSplitReadTask.getBinlogPosition(offsetContext.getOffset())).isAtOrAfter(this.binlogSplit.getStopOffset())) {
                try {
                    this.dispatcher.dispatchWatermarkEvent(offsetContext.getPartition(), this.binlogSplit, currentBinlogOffset, WatermarkKind.END);
                }
                catch (InterruptedException e) {
                    LOG.error("Send signal event error.", (Throwable)e);
                    this.errorHandler.setProducerThrowable(new DebeziumException("Error processing binlog signal event", e));
                }
                ((MySqlSnapshotFetchTask.SnapshotBinlogSplitChangeEventSourceContext)this.context).finished();
            }
        }

        private boolean isBoundedRead() {
            return !BinlogOffset.NO_STOPPING_OFFSET.equals(this.binlogSplit.getStopOffset());
        }

        public static BinlogOffset getBinlogPosition(Map<String, ?> offset) {
            HashMap<String, String> offsetStrMap = new HashMap<String, String>();
            for (Map.Entry<String, ?> entry : offset.entrySet()) {
                offsetStrMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
            }
            return new BinlogOffset(offsetStrMap);
        }
    }
}

