/*
 * Decompiled with CFR 0.152.
 */
package com.zendesk.maxwell.schema;

import com.zendesk.maxwell.CaseSensitivity;
import com.zendesk.maxwell.replication.Position;
import com.zendesk.maxwell.schema.MysqlPositionStore;
import com.zendesk.maxwell.schema.MysqlSavedSchema;
import com.zendesk.maxwell.schema.ddl.InvalidSchemaError;
import com.zendesk.maxwell.util.ConnectionPool;
import com.zendesk.maxwell.util.RunLoopProcess;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MysqlSchemaCompactor
extends RunLoopProcess {
    static final Logger LOGGER = LoggerFactory.getLogger(MysqlSchemaCompactor.class);
    private final ConnectionPool maxwellConnectionPool;
    private final String clientID;
    private final Long serverID;
    private final CaseSensitivity sensitivity;
    private final int maxDeltas;
    Long lastWarnedSchemaID = null;
    private static final int DELETE_SLEEP_MS = 200;
    private static final int DELETE_LIMIT = 500;

    public MysqlSchemaCompactor(int maxDeltas, ConnectionPool maxwellConnectionPool, String clientID, Long serverID, CaseSensitivity sensitivity) {
        this.maxDeltas = maxDeltas;
        this.maxwellConnectionPool = maxwellConnectionPool;
        this.clientID = clientID;
        this.serverID = serverID;
        this.sensitivity = sensitivity;
    }

    @Override
    protected void work() throws Exception {
        try {
            this.doWork();
            Thread.sleep(5000L);
        }
        catch (InterruptedException interruptedException) {
        }
        catch (SQLException e) {
            LOGGER.error("got SQLException trying to compact", (Throwable)e);
        }
    }

    private String lockName() {
        return "maxwell_schema_compaction-" + this.serverID;
    }

    private boolean getLock(Connection cx) throws SQLException {
        try (PreparedStatement s = cx.prepareStatement("SELECT GET_LOCK(?, 0)");){
            boolean bl;
            block12: {
                s.setString(1, this.lockName());
                ResultSet rs = s.executeQuery();
                try {
                    boolean bl2 = bl = rs.next() && rs.getBoolean(1);
                    if (rs == null) break block12;
                }
                catch (Throwable throwable) {
                    if (rs != null) {
                        try {
                            rs.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                rs.close();
            }
            return bl;
        }
    }

    private void releaseLock(Connection cx) throws SQLException {
        try (PreparedStatement s = cx.prepareStatement("SELECT RELEASE_LOCK(?)");){
            s.setString(1, this.lockName());
            s.execute();
        }
    }

    public void doWork() throws Exception {
        try (Connection cx = this.maxwellConnectionPool.getConnection();){
            cx.setAutoCommit(false);
            try {
                if (this.getLock(cx)) {
                    this.compact(cx);
                }
            }
            finally {
                cx.setAutoCommit(true);
                this.releaseLock(cx);
            }
        }
    }

    private boolean shouldCompact(Connection cx) throws SQLException {
        String sql = "select count(*) as count from `schemas` where `server_id` = " + this.serverID;
        try (PreparedStatement ps = cx.prepareStatement(sql);){
            boolean bl;
            block12: {
                ResultSet rs = ps.executeQuery();
                try {
                    boolean bl2 = bl = rs.next() && rs.getInt("count") >= this.maxDeltas;
                    if (rs == null) break block12;
                }
                catch (Throwable throwable) {
                    if (rs != null) {
                        try {
                            rs.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                rs.close();
            }
            return bl;
        }
    }

    private Long chooseCompactedSchemaBase(Connection cx) throws SQLException {
        Position schemaPosition;
        Long schemaID;
        if (!this.shouldCompact(cx)) {
            return null;
        }
        String schemaSql = "select id, binlog_file, binlog_position, gtid_set, 0 as last_heartbeat_read  from `schemas` where `server_id` = " + this.serverID + " order by id desc limit 1";
        try (PreparedStatement ps = cx.prepareStatement(schemaSql);
             ResultSet rs = ps.executeQuery();){
            if (!rs.next()) {
                Long l = null;
                return l;
            }
            schemaID = rs.getLong("id");
            schemaPosition = MysqlPositionStore.positionFromResultSet(rs, this.serverID == 0L);
        }
        LOGGER.debug("trying to compact schema {} @ {}", (Object)schemaID, (Object)schemaPosition);
        ps = cx.prepareStatement("select * from `positions` where server_id = " + this.serverID);
        try (ResultSet positionsRS = ps.executeQuery();){
            while (true) {
                if (positionsRS.next()) {
                    Position clientPosition = MysqlPositionStore.positionFromResultSet(positionsRS, this.serverID == 0L);
                    if (!clientPosition.newerThan(schemaPosition)) {
                        if (!schemaID.equals(this.lastWarnedSchemaID)) {
                            LOGGER.warn("Not compacting schema {}, client '{}' @ {} has not reached that position yet", new Object[]{schemaID, positionsRS.getString("client_id"), clientPosition});
                            this.lastWarnedSchemaID = schemaID;
                        }
                        Long l = null;
                        return l;
                    }
                    LOGGER.debug("found a client @ {}, that's fine...", (Object)clientPosition);
                    continue;
                }
                break;
            }
        }
        finally {
            if (ps != null) {
                ps.close();
            }
        }
        return schemaID;
    }

    private void compact(Connection cx) throws SQLException, InvalidSchemaError {
        if (!this.shouldCompact(cx)) {
            return;
        }
        Long schemaID = this.chooseCompactedSchemaBase(cx);
        if (schemaID == null) {
            return;
        }
        LOGGER.info("compacting schemas before {}", (Object)schemaID);
        try (Statement begin = cx.createStatement();
             Statement update = cx.createStatement();
             Statement commit = cx.createStatement();){
            begin.execute("BEGIN");
            MysqlSavedSchema savedSchema = MysqlSavedSchema.restoreFromSchemaID(schemaID, cx, this.sensitivity);
            savedSchema.saveFullSchema(cx, schemaID);
            update.executeUpdate("update `schemas` set `base_schema_id` = null, `deltas` = null where `id` = " + schemaID);
            commit.execute("COMMIT");
            LOGGER.info("Committed schema compaction for {}", (Object)schemaID);
        }
        this.slowDeleteSchemas(cx, schemaID);
        LOGGER.info("Finished deleting old schemas prior to {}", (Object)schemaID);
    }

    private void slowDeleteSchemas(Connection cx, long newBaseSchemaID) throws SQLException {
        cx.setAutoCommit(true);
        String sql = "select * from `schemas` where id < ? and server_id = ?";
        try (PreparedStatement ps = cx.prepareStatement(sql);){
            ps.setLong(1, newBaseSchemaID);
            ps.setLong(2, this.serverID);
            try (ResultSet rs = ps.executeQuery();){
                while (rs.next()) {
                    this.slowDeleteSchema(cx, rs.getLong("id"));
                }
            }
        }
    }

    private void slowDeleteSchema(Connection cx, long schemaID) throws SQLException {
        LOGGER.debug("slow deleting schema_id: {}", (Object)schemaID);
        this.slowDeleteFrom("columns", cx, schemaID);
        this.slowDeleteFrom("tables", cx, schemaID);
        this.slowDeleteFrom("databases", cx, schemaID);
        try (Statement s = cx.createStatement();){
            s.executeUpdate("delete from `schemas` where id = " + schemaID);
        }
    }

    private void slowDeleteFrom(String table, Connection cx, long schemaID) throws SQLException {
        try (Statement s = cx.createStatement();){
            while (true) {
                int deleted;
                if ((deleted = s.executeUpdate("DELETE from `" + table + "` where schema_id = " + schemaID + " LIMIT 500")) == 0) {
                    return;
                }
                Thread.sleep(200L);
            }
        }
        catch (InterruptedException interruptedException) {
            return;
        }
    }
}

