/*
 * Decompiled with CFR 0.152.
 */
package com.zendesk.maxwell.bootstrap;

import com.zendesk.maxwell.CaseSensitivity;
import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.MaxwellMysqlStatus;
import com.zendesk.maxwell.bootstrap.BootstrapTask;
import com.zendesk.maxwell.errors.DuplicateProcessException;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.producer.MaxwellOutputConfig;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.schema.Database;
import com.zendesk.maxwell.schema.Schema;
import com.zendesk.maxwell.schema.SchemaCapturer;
import com.zendesk.maxwell.schema.Table;
import com.zendesk.maxwell.schema.columndef.ColumnDef;
import com.zendesk.maxwell.schema.columndef.ColumnDefCastException;
import com.zendesk.maxwell.schema.columndef.DateColumnDef;
import com.zendesk.maxwell.schema.columndef.TimeColumnDef;
import com.zendesk.maxwell.scripting.Scripting;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SynchronousBootstrapper {
    static final Logger LOGGER = LoggerFactory.getLogger(SynchronousBootstrapper.class);
    private static final long INSERTED_ROWS_UPDATE_PERIOD_MILLIS = 1000L;
    private final MaxwellContext context;
    private long lastInsertedRowsUpdateTimeMillis = 0L;
    private final String startBootstrapSQL = "update `bootstrap` set started_at=NOW() where id=?";
    private final String completeBootstrapSQL = "update `bootstrap` set is_complete=1, inserted_rows=?, completed_at=NOW() where id=?";

    public SynchronousBootstrapper(MaxwellContext context) {
        this.context = context;
    }

    public void startBootstrap(BootstrapTask task, AbstractProducer producer, Long currentSchemaID) throws Exception {
        try {
            this.performBootstrap(task, producer, currentSchemaID);
        }
        catch (BootstrapAbortException e) {
            LOGGER.error("Bootstrap (id={}) aborted: {}", (Object)task.id, (Object)e.getMessage());
            this.setBootstrapRowToCompleted(0, task.id);
            return;
        }
        this.completeBootstrap(task, producer);
    }

    private Schema captureSchemaForBootstrap(BootstrapTask task) throws SQLException {
        try (Connection cx = this.getConnection(task.database);){
            CaseSensitivity s = MaxwellMysqlStatus.captureCaseSensitivity(cx);
            SchemaCapturer c = new SchemaCapturer(cx, s, task.database, task.table);
            Schema schema = c.capture();
            return schema;
        }
    }

    private Table getTableForTask(BootstrapTask task) throws BootstrapAbortException {
        Schema schema;
        try {
            schema = this.captureSchemaForBootstrap(task);
        }
        catch (SQLException e) {
            throw new BootstrapAbortException(e.getMessage());
        }
        Database database = schema.findDatabase(task.database);
        Table table = database.findTable(task.table);
        if (table == null) {
            String errMsg = String.format("Couldn't find db/table for %s.%s", task.database, task.table);
            throw new BootstrapAbortException(errMsg);
        }
        return table;
    }

    public void performBootstrap(BootstrapTask task, AbstractProducer producer, Long currentSchemaID) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("bootstrapping requested for {}", (Object)task.logString());
        }
        Table table = this.getTableForTask(task);
        producer.push(this.bootstrapStartRowMap(task, table));
        LOGGER.info(String.format("bootstrapping started for %s.%s", task.database, task.table));
        try (Connection streamingConnection = this.getStreamingConnection(task.database);){
            this.setBootstrapRowToStarted(task.id);
            ResultSet resultSet = this.getAllRows(task.database, task.table, table, task.whereClause, streamingConnection);
            int insertedRows = 0;
            this.lastInsertedRowsUpdateTimeMillis = 0L;
            while (resultSet.next()) {
                RowMap row = this.bootstrapEventRowMap("bootstrap-insert", table.database, table.name, table.getPKList(), task.comment);
                this.setRowValues(row, resultSet, table);
                row.setSchemaId(currentSchemaID);
                Scripting scripting = this.context.getConfig().scripting;
                if (scripting != null) {
                    scripting.invoke(row);
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bootstrapping row : {}", (Object)row.toJSON());
                }
                producer.push(row);
                this.updateInsertedRowsColumn(++insertedRows, task.id);
            }
            this.setBootstrapRowToCompleted(insertedRows, task.id);
        }
        catch (NoSuchElementException e) {
            LOGGER.info("bootstrapping aborted for " + task.logString());
        }
    }

    private void updateInsertedRowsColumn(int insertedRows, Long id) throws SQLException, NoSuchElementException, DuplicateProcessException {
        long now = System.currentTimeMillis();
        if (now - this.lastInsertedRowsUpdateTimeMillis > 1000L) {
            this.context.getMaxwellConnectionPool().withSQLRetry(1, connection -> {
                String sql = "update `bootstrap` set inserted_rows = ? where id = ?";
                PreparedStatement preparedStatement = connection.prepareStatement(sql);
                preparedStatement.setInt(1, insertedRows);
                preparedStatement.setLong(2, id);
                if (preparedStatement.executeUpdate() == 0) {
                    throw new NoSuchElementException();
                }
                this.lastInsertedRowsUpdateTimeMillis = now;
            });
        }
    }

    private Connection getConnection(String databaseName) throws SQLException {
        Connection conn = this.context.getReplicationConnection();
        conn.setCatalog(databaseName);
        return conn;
    }

    private Connection getStreamingConnection(String databaseName) throws SQLException, URISyntaxException {
        Connection conn = DriverManager.getConnection(this.context.getConfig().replicationMysql.getConnectionURI(false), this.context.getConfig().replicationMysql.user, this.context.getConfig().replicationMysql.password);
        conn.setCatalog(databaseName);
        return conn;
    }

    private RowMap bootstrapStartRowMap(BootstrapTask task, Table table) {
        return this.bootstrapEventRowMap("bootstrap-start", table.database, table.name, table.getPKList(), task.comment);
    }

    private RowMap bootstrapEventRowMap(String type, String db, String tbl, List<String> pkList, String comment) {
        RowMap row = new RowMap(type, db, tbl, System.currentTimeMillis(), pkList, null);
        row.setComment(comment);
        return row;
    }

    public void completeBootstrap(BootstrapTask task, AbstractProducer producer) throws Exception {
        producer.push(this.bootstrapEventRowMap("bootstrap-complete", task.database, task.table, new ArrayList<String>(), task.comment));
        LOGGER.info("bootstrapping ended for " + task.logString());
    }

    private ResultSet getAllRows(String databaseName, String tableName, Table table, String whereClause, Connection connection) throws SQLException {
        Statement statement = this.createBatchStatement(connection);
        String pk = table.getPKString();
        Object sql = String.format("select * from `%s`.`%s`", databaseName, tableName);
        if (whereClause != null && !whereClause.equals("")) {
            sql = (String)sql + String.format(" where %s", whereClause);
        }
        if (pk != null && !pk.equals("")) {
            sql = (String)sql + String.format(" order by %s", pk);
        }
        return statement.executeQuery((String)sql);
    }

    private Statement createBatchStatement(Connection connection) throws SQLException {
        Statement statement = connection.createStatement(1003, 1007);
        statement.setFetchSize(Integer.MIN_VALUE);
        return statement;
    }

    private void setBootstrapRowToStarted(Long id) throws SQLException, NoSuchElementException, DuplicateProcessException {
        this.context.getMaxwellConnectionPool().withSQLRetry(1, connection -> {
            PreparedStatement preparedStatement = connection.prepareStatement("update `bootstrap` set started_at=NOW() where id=?");
            preparedStatement.setLong(1, id);
            if (preparedStatement.executeUpdate() == 0) {
                throw new NoSuchElementException();
            }
        });
    }

    private void setBootstrapRowToCompleted(int insertedRows, Long id) throws SQLException, NoSuchElementException, DuplicateProcessException {
        this.context.getMaxwellConnectionPool().withSQLRetry(1, connection -> {
            PreparedStatement preparedStatement = connection.prepareStatement("update `bootstrap` set is_complete=1, inserted_rows=?, completed_at=NOW() where id=?");
            preparedStatement.setInt(1, insertedRows);
            preparedStatement.setLong(2, id);
            if (preparedStatement.executeUpdate() == 0) {
                throw new NoSuchElementException();
            }
        });
    }

    private Object getTimestamp(ResultSet resultSet, int columnIndex) throws SQLException {
        try {
            return resultSet.getTimestamp(columnIndex);
        }
        catch (SQLException e) {
            LOGGER.error("error trying to deserialize column at index: " + columnIndex);
            LOGGER.error("raw value:" + resultSet.getObject(columnIndex));
            throw e;
        }
    }

    private void setRowValues(RowMap row, ResultSet resultSet, Table table) throws SQLException, ColumnDefCastException {
        Iterator<ColumnDef> columnDefinitions = table.getColumnList().iterator();
        int columnIndex = 1;
        while (columnDefinitions.hasNext()) {
            ColumnDef columnDefinition = columnDefinitions.next();
            Object columnValue = columnDefinition instanceof TimeColumnDef ? this.getTimestamp(resultSet, columnIndex) : (columnDefinition instanceof DateColumnDef ? resultSet.getString(columnIndex) : resultSet.getObject(columnIndex));
            row.putData(columnDefinition.getName(), columnValue == null ? null : columnDefinition.asJSON(columnValue, new MaxwellOutputConfig()));
            ++columnIndex;
        }
    }

    class BootstrapAbortException
    extends Exception {
        public BootstrapAbortException(String message) {
            super(message);
        }
    }
}

