/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSnapshotChangeEventSource
extends RelationalSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class);
    private final PostgresConnectorConfig connectorConfig;
    private final PostgresConnection jdbcConnection;
    private final PostgresSchema schema;
    private final SlotCreationResult slotCreatedInfo;
    private final SlotState startingSlotInfo;

    public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, SnapshotterService snapshotterService, MainConnectionProvidingConnectionFactory<PostgresConnection> connectionFactory, PostgresSchema schema, EventDispatcher<PostgresPartition, TableId> dispatcher, Clock clock, SnapshotProgressListener<PostgresPartition> snapshotProgressListener, SlotCreationResult slotCreatedInfo, SlotState startingSlotInfo, NotificationService<PostgresPartition, PostgresOffsetContext> notificationService) {
        super((RelationalDatabaseConnectorConfig)connectorConfig, connectionFactory, (RelationalDatabaseSchema)schema, dispatcher, clock, snapshotProgressListener, notificationService, snapshotterService);
        this.connectorConfig = connectorConfig;
        this.jdbcConnection = (PostgresConnection)connectionFactory.mainConnection();
        this.schema = schema;
        this.slotCreatedInfo = slotCreatedInfo;
        this.startingSlotInfo = startingSlotInfo;
    }

    public SnapshottingTask getSnapshottingTask(PostgresPartition partition, PostgresOffsetContext previousOffset) {
        boolean snapshotData;
        boolean snapshotSchema = true;
        List dataCollectionsToBeSnapshotted = this.connectorConfig.getDataCollectionsToBeSnapshotted();
        Map snapshotSelectOverridesByTable = this.connectorConfig.getSnapshotSelectOverridesByTable();
        boolean offsetExists = previousOffset != null;
        boolean snapshotInProgress = false;
        if (offsetExists) {
            snapshotInProgress = previousOffset.isInitialSnapshotRunning();
        }
        if (offsetExists && !previousOffset.isInitialSnapshotRunning()) {
            LOGGER.info("A previous offset indicating a completed snapshot has been found.");
        }
        if (snapshotData = this.snapshotterService.getSnapshotter().shouldSnapshotData(offsetExists, snapshotInProgress)) {
            LOGGER.info("According to the connector configuration data will be snapshotted");
        } else {
            LOGGER.info("According to the connector configuration no snapshot will be executed");
            snapshotSchema = false;
        }
        return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false);
    }

    protected AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> prepare(PostgresPartition partition, boolean onDemand) {
        return new PostgresSnapshotContext(partition, this.connectorConfig.databaseName(), onDemand);
    }

    protected void connectionCreated(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) throws Exception {
        if (this.snapshotterService.getSnapshotter().shouldStreamEventsStartingFromSnapshot() && this.startingSlotInfo == null) {
            this.setSnapshotTransactionIsolationLevel(snapshotContext.onDemand);
        }
    }

    protected Set<TableId> getAllTableIds(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> ctx) throws Exception {
        return this.jdbcConnection.getAllTableIds(ctx.catalogName);
    }

    protected void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) throws SQLException {
        Duration lockTimeout = this.connectorConfig.snapshotLockTimeout();
        Set capturedTablesNames = snapshotContext.capturedTables.stream().map(TableId::toDoubleQuotedString).collect(Collectors.toSet());
        List<String> tableLockStatements = capturedTablesNames.stream().map(tableId -> this.snapshotterService.getSnapshotLock().tableLockingStatement(lockTimeout, tableId)).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
        if (!tableLockStatements.isEmpty()) {
            String lineSeparator = System.lineSeparator();
            StringBuilder statements = new StringBuilder();
            statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator);
            tableLockStatements.forEach(tableStatement -> statements.append((String)tableStatement).append(lineSeparator));
            LOGGER.info("Waiting a maximum of '{}' seconds for each table lock", (Object)lockTimeout.getSeconds());
            this.jdbcConnection.executeWithoutCommitting(new String[]{statements.toString()});
        }
    }

    protected void releaseSchemaSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) throws SQLException {
    }

    protected void determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> ctx, PostgresOffsetContext previousOffset) throws Exception {
        PostgresOffsetContext offset = (PostgresOffsetContext)ctx.offset;
        if (offset == null) {
            offset = previousOffset != null && !this.snapshotterService.getSnapshotter().shouldStreamEventsStartingFromSnapshot() ? PostgresOffsetContext.initialContext(this.connectorConfig, this.jdbcConnection, this.getClock(), previousOffset.lastCommitLsn(), previousOffset.lastCompletelyProcessedLsn()) : PostgresOffsetContext.initialContext(this.connectorConfig, this.jdbcConnection, this.getClock());
            ctx.offset = offset;
        }
        this.updateOffsetForSnapshot(offset);
    }

    private void updateOffsetForSnapshot(PostgresOffsetContext offset) throws SQLException {
        Lsn xlogStart = this.getTransactionStartLsn();
        Long txId = this.jdbcConnection.currentTransactionId();
        LOGGER.info("Read xlogStart at '{}' from transaction '{}'", (Object)xlogStart, (Object)txId);
        offset.updateWalPosition(xlogStart, offset.lastCompletelyProcessedLsn(), this.clock.currentTime(), txId, offset.xmin(), null, null);
    }

    protected void updateOffsetForPreSnapshotCatchUpStreaming(PostgresOffsetContext offset) throws SQLException {
        this.updateOffsetForSnapshot(offset);
        offset.setStreamingStoppingLsn(Lsn.valueOf(this.jdbcConnection.currentXLogLocation()));
    }

    private Lsn getTransactionStartLsn() throws SQLException {
        if (this.slotCreatedInfo != null) {
            return this.slotCreatedInfo.startLsn();
        }
        if (!this.snapshotterService.getSnapshotter().shouldStreamEventsStartingFromSnapshot() && this.startingSlotInfo != null) {
            SlotState currentSlotState = this.jdbcConnection.getReplicationSlotState(this.connectorConfig.slotName(), this.connectorConfig.plugin().getPostgresPluginName());
            return currentSlotState.slotLastFlushedLsn();
        }
        return Lsn.valueOf(this.jdbcConnection.currentXLogLocation());
    }

    protected void readTableStructure(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, PostgresOffsetContext offsetContext, SnapshottingTask snapshottingTask) throws SQLException, InterruptedException {
        Set schemas = snapshotContext.capturedTables.stream().map(TableId::schema).collect(Collectors.toSet());
        for (String schema : schemas) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while reading structure of schema " + schema);
            }
            LOGGER.info("Reading structure of schema '{}' of catalog '{}'", (Object)schema, (Object)snapshotContext.catalogName);
            Tables.TableFilter tableFilter = snapshottingTask.isOnDemand() ? Tables.TableFilter.fromPredicate(snapshotContext.capturedTables::contains) : this.connectorConfig.getTableFilters().dataCollectionFilter();
            this.jdbcConnection.readSchema(snapshotContext.tables, snapshotContext.catalogName, schema, tableFilter, null, false);
        }
        this.schema.refresh(this.jdbcConnection, false);
    }

    protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, Table table) {
        return SchemaChangeEvent.ofSnapshotCreate((Partition)snapshotContext.partition, (OffsetContext)snapshotContext.offset, (String)snapshotContext.catalogName, (Table)table);
    }

    protected void completed(AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) {
        this.snapshotterService.getSnapshotter().snapshotCompleted();
    }

    protected void aborted(AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) {
        this.snapshotterService.getSnapshotter().snapshotAborted();
    }

    protected Optional<String> getSnapshotSelect(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, TableId tableId, List<String> columns) {
        return this.snapshotterService.getSnapshotQuery().snapshotQuery(tableId.toDoubleQuotedString(), columns);
    }

    protected void setSnapshotTransactionIsolationLevel(boolean isOnDemand) throws SQLException {
        LOGGER.info("Setting isolation level");
        String transactionStatement = this.snapshotTransactionIsolationLevelStatement(this.slotCreatedInfo, isOnDemand);
        LOGGER.info("Opening transaction with statement {}", (Object)transactionStatement);
        this.jdbcConnection.executeWithoutCommitting(new String[]{transactionStatement});
    }

    private String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) {
        if (newSlotInfo != null && !isOnDemand) {
            String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName());
            return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;
        }
        PostgresConnectorConfig.SnapshotIsolationMode isolationMode = this.connectorConfig.getSnapshotIsolationMode();
        if (isolationMode == PostgresConnectorConfig.SnapshotIsolationMode.REPEATABLE_READ) {
            return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;";
        }
        if (isolationMode == PostgresConnectorConfig.SnapshotIsolationMode.READ_COMMITTED) {
            return "SET TRANSACTION ISOLATION LEVEL READ COMMITTED, READ ONLY;";
        }
        if (isolationMode == PostgresConnectorConfig.SnapshotIsolationMode.READ_UNCOMMITTED) {
            return "SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED, READ ONLY;";
        }
        return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;";
    }

    protected PostgresOffsetContext copyOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) {
        return new PostgresOffsetContext.Loader(this.connectorConfig).load(((PostgresOffsetContext)snapshotContext.offset).getOffset());
    }

    private static class PostgresSnapshotContext
    extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> {
        PostgresSnapshotContext(PostgresPartition partition, String catalogName, boolean onDemand) {
            super((Partition)partition, catalogName, onDemand);
        }
    }
}

