/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.time.Duration;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.SnapshotSplitChangeEventSourceContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSnapshotSplitReadTask
extends AbstractSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
    private static final Logger log = LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
    private static final Duration LOG_INTERVAL = Duration.ofMillis(10000L);
    private final PostgresConnectorConfig connectorConfig;
    private final PostgresSchema databaseSchema;
    private final PostgresConnection jdbcConnection;
    private final JdbcSourceEventDispatcher<PostgresPartition> dispatcher;
    private final Clock clock;
    private final SnapshotSplit snapshotSplit;
    private final PostgresOffsetContext offsetContext;
    private final SnapshotProgressListener<PostgresPartition> snapshotProgressListener;

    public PostgresSnapshotSplitReadTask(PostgresConnectorConfig connectorConfig, PostgresOffsetContext previousOffset, SnapshotProgressListener snapshotProgressListener, PostgresSchema databaseSchema, PostgresConnection jdbcConnection, JdbcSourceEventDispatcher dispatcher, SnapshotSplit snapshotSplit) {
        super(connectorConfig, snapshotProgressListener);
        this.offsetContext = previousOffset;
        this.connectorConfig = connectorConfig;
        this.databaseSchema = databaseSchema;
        this.jdbcConnection = jdbcConnection;
        this.dispatcher = dispatcher;
        this.clock = Clock.SYSTEM;
        this.snapshotSplit = snapshotSplit;
        this.snapshotProgressListener = snapshotProgressListener;
    }

    @Override
    public SnapshotResult<PostgresOffsetContext> execute(ChangeEventSource.ChangeEventSourceContext context, PostgresPartition partition, PostgresOffsetContext previousOffset) throws InterruptedException {
        AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> ctx;
        AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask = this.getSnapshottingTask(partition, previousOffset);
        try {
            ctx = this.prepare(partition);
        }
        catch (Exception e) {
            log.error("Failed to initialize snapshot context.", (Throwable)e);
            throw new RuntimeException(e);
        }
        try {
            return this.doExecute(context, previousOffset, ctx, snapshottingTask);
        }
        catch (InterruptedException e) {
            log.warn("Snapshot was interrupted before completion");
            throw e;
        }
        catch (Exception t) {
            throw new DebeziumException(t);
        }
    }

    @Override
    protected SnapshotResult<PostgresOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext context, PostgresOffsetContext previousOffset, AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        PostgresSnapshotContext ctx = (PostgresSnapshotContext)snapshotContext;
        ctx.offset = this.offsetContext;
        LsnOffset lowWatermark = PostgresUtils.currentLsn(this.jdbcConnection);
        log.info("Snapshot step 1 - Determining low watermark {} for split {}", (Object)lowWatermark, (Object)this.snapshotSplit);
        ((SnapshotSplitChangeEventSourceContext)context).setLowWatermark(lowWatermark);
        this.dispatcher.dispatchWatermarkEvent(((PostgresPartition)ctx.partition).getSourcePartition(), this.snapshotSplit, lowWatermark, WatermarkKind.LOW);
        log.info("Snapshot step 2 - Snapshotting data");
        this.createDataEvents(ctx, this.snapshotSplit.getTableId());
        LsnOffset highWatermark = PostgresUtils.currentLsn(this.jdbcConnection);
        log.info("Snapshot step 3 - Determining high watermark {} for split {}", (Object)highWatermark, (Object)this.snapshotSplit);
        ((SnapshotSplitChangeEventSourceContext)context).setHighWatermark(highWatermark);
        this.dispatcher.dispatchWatermarkEvent(((PostgresPartition)ctx.partition).getSourcePartition(), this.snapshotSplit, highWatermark, WatermarkKind.HIGH);
        return SnapshotResult.completed(ctx.offset);
    }

    @Override
    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(PostgresPartition partition, PostgresOffsetContext previousOffset) {
        return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
    }

    @Override
    protected AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> prepare(PostgresPartition partition) throws Exception {
        return new PostgresSnapshotContext(partition);
    }

    private void createDataEvents(PostgresSnapshotContext snapshotContext, TableId tableId) throws Exception {
        EventDispatcher.SnapshotReceiver snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        log.debug("Snapshotting table {}", (Object)tableId);
        TableId newTableId = new TableId(null, tableId.schema(), tableId.table());
        this.createDataEventsForTable(snapshotContext, snapshotReceiver, this.databaseSchema.tableFor(newTableId));
        snapshotReceiver.completeSnapshot();
    }

    private void createDataEventsForTable(PostgresSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, Table table) throws InterruptedException {
        long exportStart = this.clock.currentTimeInMillis();
        log.info("Exporting data from split '{}' of table {}", (Object)this.snapshotSplit.splitId(), (Object)table.id());
        String selectSql = PostgresUtils.buildSplitScanQuery(table, this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null);
        log.info("For split '{}' of table {} using select statement: '{}'", new Object[]{this.snapshotSplit.splitId(), table.id(), selectSql});
        try (PreparedStatement selectStatement = PostgresUtils.readTableSplitDataStatement(this.jdbcConnection, selectSql, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType(), this.connectorConfig.getQueryFetchSize());
             ResultSet rs = selectStatement.executeQuery();){
            ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
            long rows = 0L;
            Threads.Timer logTimer = this.getTableScanLogTimer();
            while (rs.next()) {
                ++rows;
                Object[] row = new Object[columnArray.getGreatestColumnPosition()];
                for (int i = 0; i < columnArray.getColumns().length; ++i) {
                    row[columnArray.getColumns()[i].position() - 1] = rs.getObject(i + 1);
                }
                if (logTimer.expired()) {
                    long stop = this.clock.currentTimeInMillis();
                    log.info("Exported {} records for split '{}' after {}", new Object[]{rows, this.snapshotSplit.splitId(), Strings.duration(stop - exportStart)});
                    this.snapshotProgressListener.rowsScanned((PostgresPartition)snapshotContext.partition, table.id(), rows);
                    logTimer = this.getTableScanLogTimer();
                }
                this.dispatcher.dispatchSnapshotEvent((PostgresPartition)snapshotContext.partition, table.id(), this.getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver);
            }
            log.info("Finished exporting {} records for split '{}', total duration '{}'", new Object[]{rows, this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - exportStart)});
        }
        catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
        }
    }

    protected ChangeRecordEmitter getChangeRecordEmitter(PostgresSnapshotContext snapshotContext, TableId tableId, Object[] row) {
        ((PostgresOffsetContext)snapshotContext.offset).event(tableId, this.clock.currentTime());
        return new SnapshotChangeRecordEmitter<Partition>(snapshotContext.partition, snapshotContext.offset, row, this.clock);
    }

    private Threads.Timer getTableScanLogTimer() {
        return Threads.timer(this.clock, LOG_INTERVAL);
    }

    private Object readField(ResultSet rs, int columnIndex) throws SQLException {
        ResultSetMetaData metaData = rs.getMetaData();
        int columnType = metaData.getColumnType(columnIndex);
        if (columnType == 92) {
            return rs.getTimestamp(columnIndex);
        }
        return rs.getObject(columnIndex);
    }

    private static class PostgresSnapshotContext
    extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> {
        public PostgresSnapshotContext(PostgresPartition partition) throws SQLException {
            super(partition, "");
        }
    }
}

