/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.postgresql.util.PGmoney;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSnapshotChangeEventSource
extends RelationalSnapshotChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class);
    private final PostgresConnectorConfig connectorConfig;
    private final PostgresConnection jdbcConnection;
    private final PostgresSchema schema;
    private final Snapshotter snapshotter;
    private final SlotCreationResult slotCreatedInfo;
    private final SlotState startingSlotInfo;
    private final PostgresOffsetContext previousOffset;

    public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresOffsetContext previousOffset, PostgresConnection jdbcConnection, PostgresSchema schema, EventDispatcher<TableId> dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener, SlotCreationResult slotCreatedInfo, SlotState startingSlotInfo) {
        super((RelationalDatabaseConnectorConfig)connectorConfig, (OffsetContext)previousOffset, (JdbcConnection)jdbcConnection, dispatcher, clock, snapshotProgressListener);
        this.connectorConfig = connectorConfig;
        this.jdbcConnection = jdbcConnection;
        this.schema = schema;
        this.snapshotter = snapshotter;
        this.slotCreatedInfo = slotCreatedInfo;
        this.startingSlotInfo = startingSlotInfo;
        this.previousOffset = previousOffset;
    }

    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
        boolean snapshotSchema = true;
        boolean snapshotData = true;
        snapshotData = this.snapshotter.shouldSnapshot();
        if (snapshotData) {
            LOGGER.info("According to the connector configuration data will be snapshotted");
        } else {
            LOGGER.info("According to the connector configuration no snapshot will be executed");
            snapshotSchema = false;
        }
        return new AbstractSnapshotChangeEventSource.SnapshottingTask(snapshotSchema, snapshotData);
    }

    protected AbstractSnapshotChangeEventSource.SnapshotContext prepare(ChangeEventSource.ChangeEventSourceContext context) throws Exception {
        return new PostgresSnapshotContext(this.connectorConfig.databaseName());
    }

    protected void connectionCreated(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws Exception {
        if (this.snapshotter.shouldStreamEventsStartingFromSnapshot() && this.startingSlotInfo == null) {
            this.setSnapshotTransactionIsolationLevel();
        }
        this.schema.refresh(this.jdbcConnection, false);
    }

    protected Set<TableId> getAllTableIds(RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx) throws Exception {
        return this.jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[]{"TABLE"});
    }

    protected void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws SQLException, InterruptedException {
        Duration lockTimeout = this.connectorConfig.snapshotLockTimeout();
        Optional<String> lockStatement = this.snapshotter.snapshotTableLockingStatement(lockTimeout, this.schema.tableIds());
        if (lockStatement.isPresent()) {
            LOGGER.info("Waiting a maximum of '{}' seconds for each table lock", (Object)lockTimeout.getSeconds());
            this.jdbcConnection.executeWithoutCommitting(new String[]{lockStatement.get()});
            this.schema.refresh(this.jdbcConnection, false);
        } else if (!this.snapshotter.exportSnapshot()) {
            LOGGER.warn("Step 2: skipping locking each table, this may result in inconsistent schema!");
        } else {
            LOGGER.info("Step 2: skipping locking each table in an exported snapshot");
        }
    }

    protected void releaseSchemaSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws SQLException {
    }

    protected void determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx) throws Exception {
        PostgresOffsetContext offset = (PostgresOffsetContext)ctx.offset;
        if (offset == null) {
            offset = this.previousOffset != null && !this.snapshotter.shouldStreamEventsStartingFromSnapshot() ? PostgresOffsetContext.initialContext(this.connectorConfig, this.jdbcConnection, this.getClock(), this.previousOffset.lastCommitLsn(), this.previousOffset.lastCompletelyProcessedLsn()) : PostgresOffsetContext.initialContext(this.connectorConfig, this.jdbcConnection, this.getClock());
            ctx.offset = offset;
        }
        this.updateOffsetForSnapshot(offset);
    }

    private void updateOffsetForSnapshot(PostgresOffsetContext offset) throws SQLException {
        Lsn xlogStart = this.getTransactionStartLsn();
        long txId = this.jdbcConnection.currentTransactionId();
        LOGGER.info("Read xlogStart at '{}' from transaction '{}'", (Object)xlogStart, (Object)txId);
        offset.updateWalPosition(xlogStart, offset.lastCompletelyProcessedLsn(), this.clock.currentTime(), txId, null, offset.xmin());
    }

    protected void updateOffsetForPreSnapshotCatchUpStreaming(PostgresOffsetContext offset) throws SQLException {
        this.updateOffsetForSnapshot(offset);
        offset.setStreamingStoppingLsn(Lsn.valueOf(this.jdbcConnection.currentXLogLocation()));
    }

    private Lsn getTransactionStartLsn() throws SQLException {
        if (this.snapshotter.exportSnapshot() && this.slotCreatedInfo != null) {
            return this.slotCreatedInfo.startLsn();
        }
        if (!this.snapshotter.shouldStreamEventsStartingFromSnapshot() && this.startingSlotInfo != null) {
            SlotState currentSlotState = this.jdbcConnection.getReplicationSlotState(this.connectorConfig.slotName(), this.connectorConfig.plugin().getPostgresPluginName());
            return currentSlotState.slotLastFlushedLsn();
        }
        return Lsn.valueOf(this.jdbcConnection.currentXLogLocation());
    }

    protected void readTableStructure(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws SQLException, InterruptedException {
        Set schemas = snapshotContext.capturedTables.stream().map(TableId::schema).collect(Collectors.toSet());
        for (String schema : schemas) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while reading structure of schema " + schema);
            }
            LOGGER.info("Reading structure of schema '{}'", (Object)snapshotContext.catalogName);
            this.jdbcConnection.readSchema(snapshotContext.tables, snapshotContext.catalogName, schema, this.connectorConfig.getTableFilters().dataCollectionFilter(), null, false);
        }
        this.schema.refresh(this.jdbcConnection, false);
    }

    protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, Table table) throws SQLException {
        return new SchemaChangeEvent(snapshotContext.offset.getPartition(), snapshotContext.offset.getOffset(), snapshotContext.offset.getSourceInfo(), snapshotContext.catalogName, table.id().schema(), null, table, SchemaChangeEvent.SchemaChangeEventType.CREATE, true);
    }

    protected void complete(AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext) {
    }

    protected Optional<String> getSnapshotSelect(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, TableId tableId) {
        return this.snapshotter.buildSnapshotQuery(tableId);
    }

    protected Object getColumnValue(ResultSet rs, int columnIndex, Column column) throws SQLException {
        try {
            ResultSetMetaData metaData = rs.getMetaData();
            String columnTypeName = metaData.getColumnTypeName(columnIndex);
            PostgresType type = this.schema.getTypeRegistry().get(columnTypeName);
            LOGGER.trace("Type of incoming data is: {}", (Object)type.getOid());
            LOGGER.trace("ColumnTypeName is: {}", (Object)columnTypeName);
            LOGGER.trace("Type is: {}", (Object)type);
            if (type.isArrayType()) {
                return rs.getArray(columnIndex);
            }
            switch (type.getOid()) {
                case 790: {
                    String sMoney = rs.getString(columnIndex);
                    if (sMoney == null) {
                        return sMoney;
                    }
                    if (sMoney.startsWith("-")) {
                        String negativeMoney = "(" + sMoney.substring(1) + ")";
                        return new PGmoney((String)negativeMoney).val;
                    }
                    return new PGmoney((String)sMoney).val;
                }
                case 1560: {
                    return rs.getString(columnIndex);
                }
                case 1700: {
                    String s = rs.getString(columnIndex);
                    if (s == null) {
                        return s;
                    }
                    Optional<SpecialValueDecimal> value = PostgresValueConverter.toSpecialValue(s);
                    return value.isPresent() ? value.get() : new SpecialValueDecimal(rs.getBigDecimal(columnIndex));
                }
                case 1083: 
                case 1266: {
                    return rs.getString(columnIndex);
                }
            }
            Object x = rs.getObject(columnIndex);
            if (x != null) {
                LOGGER.trace("rs getobject returns class: {}; rs getObject value is: {}", x.getClass(), x);
            }
            return x;
        }
        catch (SQLException e) {
            return super.getColumnValue(rs, columnIndex, column);
        }
    }

    protected void setSnapshotTransactionIsolationLevel() throws SQLException {
        LOGGER.info("Setting isolation level");
        String transactionStatement = this.snapshotter.snapshotTransactionIsolationLevelStatement(this.slotCreatedInfo);
        LOGGER.info("Opening transaction with statement {}", (Object)transactionStatement);
        this.jdbcConnection.executeWithoutCommitting(new String[]{transactionStatement});
    }

    private static class PostgresSnapshotContext
    extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
        public PostgresSnapshotContext(String catalogName) throws SQLException {
            super(catalogName);
        }
    }
}

